mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Merge pull request #60510 from ClickHouse/refactor-system-one-block
Refactor StorageSystemOneBlock
This commit is contained in:
commit
b040eadaa1
@ -58,6 +58,7 @@
|
||||
#include <Storages/System/StorageSystemFilesystemCache.h>
|
||||
#include <Parsers/ASTSystemQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||
#include <Common/ThreadFuzzer.h>
|
||||
#include <base/coverage.h>
|
||||
#include <csignal>
|
||||
|
@ -39,7 +39,7 @@ ColumnsDescription StorageSystemRocksDB::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const
|
||||
void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
@ -87,7 +87,7 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con
|
||||
{ col_table_to_filter, std::make_shared<DataTypeString>(), "table" },
|
||||
};
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
|
||||
|
||||
if (!filtered_block.rows())
|
||||
return;
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/** Implements the `rocksdb` system table, which expose various rocksdb metrics.
|
||||
*/
|
||||
class StorageSystemRocksDB final : public IStorageSystemOneBlock<StorageSystemRocksDB>
|
||||
class StorageSystemRocksDB final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemRocksDB"; }
|
||||
@ -21,7 +21,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -63,10 +63,12 @@
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
using namespace DB;
|
||||
bool columnIsPhysical(ColumnDefaultKind kind)
|
||||
{
|
||||
return kind == ColumnDefaultKind::Default || kind == ColumnDefaultKind::Materialized;
|
||||
@ -82,10 +84,23 @@ bool columnDefaultKindHasSameType(ColumnDefaultKind lhs, ColumnDefaultKind rhs)
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Adds to the select query section `WITH value AS column_name`
|
||||
///
|
||||
/// For example:
|
||||
/// - `WITH 9000 as _port`.
|
||||
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
|
||||
{
|
||||
auto & select = ast->as<ASTSelectQuery &>();
|
||||
if (!select.with())
|
||||
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
|
||||
|
||||
auto literal = std::make_shared<ASTLiteral>(value);
|
||||
literal->alias = column_name;
|
||||
literal->prefer_alias_to_column_name = true;
|
||||
select.with()->children.push_back(literal);
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
@ -928,8 +943,8 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_
|
||||
|
||||
if (!is_storage_merge_engine)
|
||||
{
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", current_storage_id.table_name);
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_database", current_storage_id.database_name);
|
||||
rewriteEntityInAst(modified_query_info.query, "_table", current_storage_id.table_name);
|
||||
rewriteEntityInAst(modified_query_info.query, "_database", current_storage_id.database_name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,9 @@
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Common/HashTable/HashSet.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -216,7 +219,6 @@ StorageMergeTreeIndex::StorageMergeTreeIndex(
|
||||
: IStorage(table_id_)
|
||||
, source_table(source_table_)
|
||||
, with_marks(with_marks_)
|
||||
, log(&Poco::Logger::get("StorageMergeTreeIndex"))
|
||||
{
|
||||
const auto * merge_tree = dynamic_cast<const MergeTreeData *>(source_table.get());
|
||||
if (!merge_tree)
|
||||
@ -230,7 +232,47 @@ StorageMergeTreeIndex::StorageMergeTreeIndex(
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
Pipe StorageMergeTreeIndex::read(
|
||||
class ReadFromMergeTreeIndex : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromMergeTreeIndex"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
ReadFromMergeTreeIndex(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageMergeTreeIndex> storage_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, storage(std::move(storage_))
|
||||
, log(&Poco::Logger::get("StorageMergeTreeIndex"))
|
||||
{
|
||||
}
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<StorageMergeTreeIndex> storage;
|
||||
Poco::Logger * log;
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
};
|
||||
|
||||
void ReadFromMergeTreeIndex::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
}
|
||||
|
||||
void StorageMergeTreeIndex::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -265,21 +307,32 @@ Pipe StorageMergeTreeIndex::read(
|
||||
|
||||
context->checkAccess(AccessType::SELECT, source_table->getStorageID(), columns_from_storage);
|
||||
|
||||
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
|
||||
auto filtered_parts = getFilteredDataParts(query_info, context);
|
||||
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
|
||||
|
||||
LOG_DEBUG(log, "Reading index{}from {} parts of table {}",
|
||||
with_marks ? " with marks " : " ",
|
||||
filtered_parts.size(),
|
||||
source_table->getStorageID().getNameForLogs());
|
||||
auto this_ptr = std::static_pointer_cast<StorageMergeTreeIndex>(shared_from_this());
|
||||
|
||||
return Pipe(std::make_shared<MergeTreeIndexSource>(std::move(header), key_sample_block, std::move(filtered_parts), context, with_marks));
|
||||
auto reading = std::make_unique<ReadFromMergeTreeIndex>(
|
||||
column_names, query_info, storage_snapshot,
|
||||
std::move(context), std::move(sample_block), std::move(this_ptr));
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(SelectQueryInfo & query_info, const ContextPtr & context) const
|
||||
void ReadFromMergeTreeIndex::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
const auto * select_query = query_info.query->as<ASTSelectQuery>();
|
||||
if (!select_query || !select_query->where())
|
||||
auto filtered_parts = storage->getFilteredDataParts(predicate, context);
|
||||
|
||||
LOG_DEBUG(log, "Reading index{}from {} parts of table {}",
|
||||
storage->with_marks ? " with marks " : " ",
|
||||
filtered_parts.size(),
|
||||
storage->source_table->getStorageID().getNameForLogs());
|
||||
|
||||
pipeline.init(Pipe(std::make_shared<MergeTreeIndexSource>(getOutputStream().header, storage->key_sample_block, std::move(filtered_parts), context, storage->with_marks)));
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(const ActionsDAG::Node * predicate, const ContextPtr & context) const
|
||||
{
|
||||
if (!predicate)
|
||||
return data_parts;
|
||||
|
||||
auto all_part_names = ColumnString::create();
|
||||
@ -287,7 +340,7 @@ MergeTreeData::DataPartsVector StorageMergeTreeIndex::getFilteredDataParts(Selec
|
||||
all_part_names->insert(part->name);
|
||||
|
||||
Block filtered_block{{std::move(all_part_names), std::make_shared<DataTypeString>(), part_name_column.name}};
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
|
||||
|
||||
if (!filtered_block.rows())
|
||||
return {};
|
||||
|
@ -21,7 +21,8 @@ public:
|
||||
const ColumnsDescription & columns,
|
||||
bool with_marks_);
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -33,14 +34,15 @@ public:
|
||||
String getName() const override { return "MergeTreeIndex"; }
|
||||
|
||||
private:
|
||||
MergeTreeData::DataPartsVector getFilteredDataParts(SelectQueryInfo & query_info, const ContextPtr & context) const;
|
||||
friend class ReadFromMergeTreeIndex;
|
||||
|
||||
MergeTreeData::DataPartsVector getFilteredDataParts(const ActionsDAG::Node * predicate, const ContextPtr & context) const;
|
||||
|
||||
StoragePtr source_table;
|
||||
bool with_marks;
|
||||
|
||||
MergeTreeData::DataPartsVector data_parts;
|
||||
Block key_sample_block;
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
||||
|
100
src/Storages/System/IStorageSystemOneBlock.cpp
Normal file
100
src/Storages/System/IStorageSystemOneBlock.cpp
Normal file
@ -0,0 +1,100 @@
|
||||
#include <Storages/System/IStorageSystemOneBlock.h>
|
||||
// #include <Core/NamesAndAliases.h>
|
||||
// #include <DataTypes/DataTypeString.h>
|
||||
// #include <Storages/ColumnsDescription.h>
|
||||
// #include <Storages/IStorage.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
|
||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadFromSystemOneBlock : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromSystemOneBlock"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
ReadFromSystemOneBlock(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<IStorageSystemOneBlock> storage_,
|
||||
std::vector<UInt8> columns_mask_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, storage(std::move(storage_))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
{
|
||||
}
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<IStorageSystemOneBlock> storage;
|
||||
std::vector<UInt8> columns_mask;
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
};
|
||||
|
||||
void IStorageSystemOneBlock::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
size_t /*num_streams*/)
|
||||
|
||||
{
|
||||
storage_snapshot->check(column_names);
|
||||
Block sample_block = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals());
|
||||
std::vector<UInt8> columns_mask;
|
||||
|
||||
if (supportsColumnsMask())
|
||||
{
|
||||
auto [columns_mask_, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
columns_mask = std::move(columns_mask_);
|
||||
sample_block = std::move(header);
|
||||
}
|
||||
|
||||
auto this_ptr = std::static_pointer_cast<IStorageSystemOneBlock>(shared_from_this());
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemOneBlock>(
|
||||
column_names, query_info, storage_snapshot,
|
||||
std::move(context), std::move(sample_block), std::move(this_ptr), std::move(columns_mask));
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromSystemOneBlock::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
const auto & sample_block = getOutputStream().header;
|
||||
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
||||
storage->fillData(res_columns, context, predicate, std::move(columns_mask));
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
Chunk chunk(std::move(res_columns), num_rows);
|
||||
|
||||
pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk))));
|
||||
}
|
||||
|
||||
void ReadFromSystemOneBlock::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
}
|
||||
|
||||
}
|
@ -1,13 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/NamesAndAliases.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
|
||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -17,8 +10,8 @@ class Context;
|
||||
|
||||
/** IStorageSystemOneBlock is base class for system tables whose all columns can be synchronously fetched.
|
||||
*
|
||||
* Client class need to provide static method static NamesAndTypesList getNamesAndTypes() that will return list of column names and
|
||||
* their types. IStorageSystemOneBlock during read will create result columns in same order as result of getNamesAndTypes
|
||||
* Client class need to provide columns_description.
|
||||
* IStorageSystemOneBlock during read will create result columns in same order as in columns_description
|
||||
* and pass it with fillData method.
|
||||
*
|
||||
* Client also must override fillData and fill result columns.
|
||||
@ -26,49 +19,32 @@ class Context;
|
||||
* If subclass want to support virtual columns, it should override getVirtuals method of IStorage interface.
|
||||
* IStorageSystemOneBlock will add virtuals columns at the end of result columns of fillData method.
|
||||
*/
|
||||
template <typename Self>
|
||||
class IStorageSystemOneBlock : public IStorage
|
||||
{
|
||||
protected:
|
||||
virtual void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const = 0;
|
||||
virtual void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8> columns_mask) const = 0;
|
||||
|
||||
virtual bool supportsColumnsMask() const { return false; }
|
||||
|
||||
friend class ReadFromSystemOneBlock;
|
||||
|
||||
public:
|
||||
explicit IStorageSystemOneBlock(const StorageID & table_id_) : IStorage(table_id_)
|
||||
explicit IStorageSystemOneBlock(const StorageID & table_id_, ColumnsDescription columns_description) : IStorage(table_id_)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(Self::getColumnsDescription());
|
||||
storage_metadata.setColumns(std::move(columns_description));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
size_t /*num_streams*/) override
|
||||
{
|
||||
storage_snapshot->check(column_names);
|
||||
Block sample_block = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals());
|
||||
|
||||
if (supportsColumnsMask())
|
||||
{
|
||||
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
query_info.columns_mask = std::move(columns_mask);
|
||||
sample_block = std::move(header);
|
||||
}
|
||||
|
||||
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
||||
fillData(res_columns, context, query_info);
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
Chunk chunk(std::move(res_columns), num_rows);
|
||||
|
||||
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
|
||||
}
|
||||
size_t /*num_streams*/) override;
|
||||
|
||||
bool isSystemStorage() const override { return true; }
|
||||
|
||||
|
@ -13,7 +13,7 @@ ColumnsDescription StorageSystemAggregateFunctionCombinators::getColumnsDescript
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemAggregateFunctionCombinators::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemAggregateFunctionCombinators::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto & combinators = AggregateFunctionCombinatorFactory::instance().getAllAggregateFunctionCombinators();
|
||||
for (const auto & pair : combinators)
|
||||
|
@ -6,10 +6,10 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class StorageSystemAggregateFunctionCombinators final : public IStorageSystemOneBlock<StorageSystemAggregateFunctionCombinators>
|
||||
class StorageSystemAggregateFunctionCombinators final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
public:
|
||||
|
@ -74,7 +74,7 @@ ColumnsDescription StorageSystemAsyncLoader::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
TimePoint now = std::chrono::system_clock::now();
|
||||
|
||||
|
@ -10,7 +10,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// system.asynchronous_loader table. Takes data from context.getAsyncLoader()
|
||||
class StorageSystemAsyncLoader final : public IStorageSystemOneBlock<StorageSystemAsyncLoader>
|
||||
class StorageSystemAsyncLoader final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemAsyncLoader"; }
|
||||
@ -20,7 +20,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ ColumnsDescription StorageSystemAsynchronousInserts::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
/** Implements the system table `asynhronous_inserts`,
|
||||
* which contains information about pending asynchronous inserts in queue.
|
||||
*/
|
||||
class StorageSystemAsynchronousInserts final : public IStorageSystemOneBlock<StorageSystemAsynchronousInserts>
|
||||
class StorageSystemAsynchronousInserts final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemAsynchronousInserts"; }
|
||||
@ -16,7 +16,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -19,11 +19,11 @@ ColumnsDescription StorageSystemAsynchronousMetrics::getColumnsDescription()
|
||||
|
||||
|
||||
StorageSystemAsynchronousMetrics::StorageSystemAsynchronousMetrics(const StorageID & table_id_, const AsynchronousMetrics & async_metrics_)
|
||||
: IStorageSystemOneBlock(table_id_), async_metrics(async_metrics_)
|
||||
: IStorageSystemOneBlock(table_id_, getColumnsDescription()), async_metrics(async_metrics_)
|
||||
{
|
||||
}
|
||||
|
||||
void StorageSystemAsynchronousMetrics::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemAsynchronousMetrics::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto async_metrics_values = async_metrics.getValues();
|
||||
for (const auto & name_value : async_metrics_values)
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/** Implements system table asynchronous_metrics, which allows to get values of periodically (asynchronously) updated metrics.
|
||||
*/
|
||||
class StorageSystemAsynchronousMetrics final : public IStorageSystemOneBlock<StorageSystemAsynchronousMetrics>
|
||||
class StorageSystemAsynchronousMetrics final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
StorageSystemAsynchronousMetrics(const StorageID & table_id_, const AsynchronousMetrics & async_metrics_);
|
||||
@ -24,7 +24,7 @@ private:
|
||||
const AsynchronousMetrics & async_metrics;
|
||||
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ ColumnsDescription StorageSystemBackups::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemBackups::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemBackups::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
size_t column_index = 0;
|
||||
auto & column_id = assert_cast<ColumnString &>(*res_columns[column_index++]);
|
||||
|
@ -7,7 +7,7 @@ namespace DB
|
||||
{
|
||||
|
||||
/// Implements `grants` system table, which allows you to get information about grants.
|
||||
class StorageSystemBackups final : public IStorageSystemOneBlock<StorageSystemBackups>
|
||||
class StorageSystemBackups final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemBackups"; }
|
||||
@ -15,7 +15,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ ColumnsDescription StorageSystemBuildOptions::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemBuildOptions::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemBuildOptions::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (auto * it = auto_config_build; *it; it += 2)
|
||||
{
|
||||
|
@ -11,10 +11,10 @@ class Context;
|
||||
|
||||
/** System table "build_options" with many params used for clickhouse building
|
||||
*/
|
||||
class StorageSystemBuildOptions final : public IStorageSystemOneBlock<StorageSystemBuildOptions>
|
||||
class StorageSystemBuildOptions final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -169,7 +169,7 @@ static void enumCertificates(const std::string & dir, bool def, MutableColumns &
|
||||
|
||||
#endif
|
||||
|
||||
void StorageSystemCertificates::fillData([[maybe_unused]] MutableColumns & res_columns, ContextPtr/* context*/, const SelectQueryInfo &) const
|
||||
void StorageSystemCertificates::fillData([[maybe_unused]] MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
#if USE_SSL
|
||||
const auto & ca_paths = Poco::Net::SSLManager::instance().defaultServerContext()->getCAPaths();
|
||||
|
@ -13,7 +13,7 @@ class Cluster;
|
||||
* that allows to obtain information about available certificates
|
||||
* and their sources.
|
||||
*/
|
||||
class StorageSystemCertificates final : public IStorageSystemOneBlock<StorageSystemCertificates>
|
||||
class StorageSystemCertificates final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemCertificates"; }
|
||||
@ -23,7 +23,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
|
||||
return description;
|
||||
}
|
||||
|
||||
void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (const auto & name_and_cluster : context->getClusters())
|
||||
writeCluster(res_columns, name_and_cluster, {});
|
||||
|
@ -15,7 +15,7 @@ class Cluster;
|
||||
* that allows to obtain information about available clusters
|
||||
* (which may be specified in Distributed tables).
|
||||
*/
|
||||
class StorageSystemClusters final : public IStorageSystemOneBlock<StorageSystemClusters>
|
||||
class StorageSystemClusters final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemClusters"; }
|
||||
@ -26,7 +26,7 @@ protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
|
||||
};
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include <Columns/Collator.h>
|
||||
#include <Storages/System/StorageSystemCollations.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,7 +16,7 @@ ColumnsDescription StorageSystemCollations::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemCollations::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemCollations::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (const auto & [locale, lang]: AvailableCollationLocales::instance().getAvailableCollations())
|
||||
{
|
||||
|
@ -5,10 +5,10 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemCollations final : public IStorageSystemOneBlock<StorageSystemCollations>
|
||||
class StorageSystemCollations final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -16,6 +16,9 @@
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Processors/Sources/NullSource.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -291,8 +294,51 @@ private:
|
||||
std::chrono::milliseconds lock_acquire_timeout;
|
||||
};
|
||||
|
||||
class ReadFromSystemColumns : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromSystemColumns"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
Pipe StorageSystemColumns::read(
|
||||
ReadFromSystemColumns(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageSystemColumns> storage_,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
size_t max_block_size_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, storage(std::move(storage_))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
{
|
||||
}
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<StorageSystemColumns> storage;
|
||||
std::vector<UInt8> columns_mask;
|
||||
const size_t max_block_size;
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
};
|
||||
|
||||
void ReadFromSystemColumns::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
}
|
||||
|
||||
void StorageSystemColumns::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -306,9 +352,22 @@ Pipe StorageSystemColumns::read(
|
||||
|
||||
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
|
||||
|
||||
auto this_ptr = std::static_pointer_cast<StorageSystemColumns>(shared_from_this());
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemColumns>(
|
||||
column_names, query_info, storage_snapshot,
|
||||
std::move(context), std::move(header), std::move(this_ptr), std::move(columns_mask), max_block_size);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromSystemColumns::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
Block block_to_filter;
|
||||
Storages storages;
|
||||
Pipes pipes;
|
||||
auto header = getOutputStream().header;
|
||||
|
||||
{
|
||||
/// Add `database` column.
|
||||
@ -338,12 +397,13 @@ Pipe StorageSystemColumns::read(
|
||||
block_to_filter.insert(ColumnWithTypeAndName(std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
|
||||
|
||||
/// Filter block with `database` column.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
|
||||
|
||||
if (!block_to_filter.rows())
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<NullSource>(header));
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
pipes.emplace_back(std::make_shared<NullSource>(std::move(header)));
|
||||
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||
return;
|
||||
}
|
||||
|
||||
ColumnPtr & database_column = block_to_filter.getByName("database").column;
|
||||
@ -384,12 +444,13 @@ Pipe StorageSystemColumns::read(
|
||||
}
|
||||
|
||||
/// Filter block with `database` and `table` columns.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
|
||||
|
||||
if (!block_to_filter.rows())
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<NullSource>(header));
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
pipes.emplace_back(std::make_shared<NullSource>(std::move(header)));
|
||||
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||
return;
|
||||
}
|
||||
|
||||
ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
|
||||
@ -400,7 +461,7 @@ Pipe StorageSystemColumns::read(
|
||||
std::move(filtered_database_column), std::move(filtered_table_column),
|
||||
std::move(storages), context));
|
||||
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,7 +17,8 @@ public:
|
||||
|
||||
std::string getName() const override { return "SystemColumns"; }
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
|
@ -17,7 +17,7 @@ ColumnsDescription StorageSystemContributors::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemContributors::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemContributors::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
std::vector<const char *> contributors;
|
||||
for (auto * it = auto_contributors; *it; ++it)
|
||||
|
@ -9,10 +9,10 @@ class Context;
|
||||
|
||||
/** System table "contributors" with list of clickhouse contributors
|
||||
*/
|
||||
class StorageSystemContributors final : public IStorageSystemOneBlock<StorageSystemContributors>
|
||||
class StorageSystemContributors final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -22,7 +22,7 @@ ColumnsDescription StorageSystemCurrentRoles::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemCurrentRoles::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemCurrentRoles::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto roles_info = context->getRolesInfo();
|
||||
auto user = context->getUser();
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// Implements `current_roles` system table, which allows you to get information about current roles.
|
||||
class StorageSystemCurrentRoles final : public IStorageSystemOneBlock<StorageSystemCurrentRoles>
|
||||
class StorageSystemCurrentRoles final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemCurrentRoles"; }
|
||||
@ -16,7 +16,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ static void fillStatusColumns(MutableColumns & res_columns, size_t & col,
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto& ddl_worker = context->getDDLWorker();
|
||||
fs::path ddl_zookeeper_path = ddl_worker.getQueueDir();
|
||||
|
@ -11,10 +11,10 @@ class Context;
|
||||
|
||||
/** System table "distributed_ddl_queue" with list of queries that are currently in the DDL worker queue.
|
||||
*/
|
||||
class StorageSystemDDLWorkerQueue final : public IStorageSystemOneBlock<StorageSystemDDLWorkerQueue>
|
||||
class StorageSystemDDLWorkerQueue final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -32,7 +32,7 @@ ColumnsDescription StorageSystemDNSCache::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemDNSCache::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemDNSCache::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
using HostIPPair = std::pair<std::string, std::string>;
|
||||
std::unordered_set<HostIPPair, boost::hash<std::pair<std::string, std::string>>> reported_elements;
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// system.dns_cache table.
|
||||
class StorageSystemDNSCache final : public IStorageSystemOneBlock<StorageSystemDNSCache>
|
||||
class StorageSystemDNSCache final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemDNSCache"; }
|
||||
@ -19,7 +19,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ String trim(const char * text)
|
||||
return String(view);
|
||||
}
|
||||
|
||||
void StorageSystemDashboards::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemDashboards::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
static const std::vector<std::map<String, String>> dashboards
|
||||
{
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
|
||||
class StorageSystemDashboards final : public IStorageSystemOneBlock<StorageSystemDashboards>
|
||||
class StorageSystemDashboards final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemDashboards"; }
|
||||
@ -22,7 +22,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -12,7 +12,10 @@
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -176,7 +179,51 @@ private:
|
||||
DatabaseTablesIteratorPtr tables_it;
|
||||
};
|
||||
|
||||
Pipe StorageSystemDataSkippingIndices::read(
|
||||
class ReadFromSystemDataSkippingIndices : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromSystemDataSkippingIndices"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
ReadFromSystemDataSkippingIndices(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageSystemDataSkippingIndices> storage_,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
size_t max_block_size_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, storage(std::move(storage_))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
{
|
||||
}
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<StorageSystemDataSkippingIndices> storage;
|
||||
std::vector<UInt8> columns_mask;
|
||||
const size_t max_block_size;
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
};
|
||||
|
||||
void ReadFromSystemDataSkippingIndices::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
}
|
||||
|
||||
void StorageSystemDataSkippingIndices::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -190,6 +237,17 @@ Pipe StorageSystemDataSkippingIndices::read(
|
||||
|
||||
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
|
||||
auto this_ptr = std::static_pointer_cast<StorageSystemDataSkippingIndices>(shared_from_this());
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemDataSkippingIndices>(
|
||||
column_names, query_info, storage_snapshot,
|
||||
std::move(context), std::move(header), std::move(this_ptr), std::move(columns_mask), max_block_size);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromSystemDataSkippingIndices::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
MutableColumnPtr column = ColumnString::create();
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
@ -207,11 +265,11 @@ Pipe StorageSystemDataSkippingIndices::read(
|
||||
|
||||
/// Condition on "database" in a query acts like an index.
|
||||
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
|
||||
|
||||
ColumnPtr & filtered_databases = block.getByPosition(0).column;
|
||||
return Pipe(std::make_shared<DataSkippingIndicesSource>(
|
||||
std::move(columns_mask), std::move(header), max_block_size, std::move(filtered_databases), context));
|
||||
pipeline.init(Pipe(std::make_shared<DataSkippingIndicesSource>(
|
||||
std::move(columns_mask), getOutputStream().header, max_block_size, std::move(filtered_databases), context)));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,7 +14,8 @@ public:
|
||||
|
||||
std::string getName() const override { return "SystemDataSkippingIndices"; }
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
|
@ -17,7 +17,7 @@ ColumnsDescription StorageSystemDataTypeFamilies::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemDataTypeFamilies::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemDataTypeFamilies::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto & factory = DataTypeFactory::instance();
|
||||
auto names = factory.getAllRegisteredNames();
|
||||
|
@ -5,10 +5,10 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemDataTypeFamilies final : public IStorageSystemOneBlock<StorageSystemDataTypeFamilies>
|
||||
class StorageSystemDataTypeFamilies final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -14,7 +14,7 @@ ColumnsDescription StorageSystemDatabaseEngines::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemDatabaseEngines::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemDatabaseEngines::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (const auto & [engine, _] : DatabaseFactory::instance().getDatabaseEngines())
|
||||
{
|
||||
|
@ -6,10 +6,10 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemDatabaseEngines final : public IStorageSystemOneBlock<StorageSystemDatabaseEngines>
|
||||
class StorageSystemDatabaseEngines final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -72,7 +72,7 @@ static String getEngineFull(const ContextPtr & ctx, const DatabasePtr & database
|
||||
return engine_full;
|
||||
}
|
||||
|
||||
static ColumnPtr getFilteredDatabases(const Databases & databases, const SelectQueryInfo & query_info, ContextPtr context)
|
||||
static ColumnPtr getFilteredDatabases(const Databases & databases, const ActionsDAG::Node * predicate, ContextPtr context)
|
||||
{
|
||||
MutableColumnPtr name_column = ColumnString::create();
|
||||
MutableColumnPtr engine_column = ColumnString::create();
|
||||
@ -94,17 +94,17 @@ static ColumnPtr getFilteredDatabases(const Databases & databases, const SelectQ
|
||||
ColumnWithTypeAndName(std::move(engine_column), std::make_shared<DataTypeString>(), "engine"),
|
||||
ColumnWithTypeAndName(std::move(uuid_column), std::make_shared<DataTypeUUID>(), "uuid")
|
||||
};
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
|
||||
return block.getByPosition(0).column;
|
||||
}
|
||||
|
||||
void StorageSystemDatabases::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const
|
||||
void StorageSystemDatabases::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8> columns_mask) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_DATABASES);
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
ColumnPtr filtered_databases_column = getFilteredDatabases(databases, query_info, context);
|
||||
ColumnPtr filtered_databases_column = getFilteredDatabases(databases, predicate, context);
|
||||
|
||||
for (size_t i = 0; i < filtered_databases_column->size(); ++i)
|
||||
{
|
||||
@ -120,7 +120,6 @@ void StorageSystemDatabases::fillData(MutableColumns & res_columns, ContextPtr c
|
||||
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
const auto & columns_mask = query_info.columns_mask;
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
if (columns_mask[src_index++])
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/** Implements `databases` system table, which allows you to get information about all databases.
|
||||
*/
|
||||
class StorageSystemDatabases final : public IStorageSystemOneBlock<StorageSystemDatabases>
|
||||
class StorageSystemDatabases final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override
|
||||
@ -26,7 +26,7 @@ protected:
|
||||
|
||||
bool supportsColumnsMask() const override { return true; }
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8> columns_mask) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -12,6 +12,8 @@
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <IO/SharedThreadPools.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
|
||||
#include <mutex>
|
||||
|
||||
@ -285,7 +287,53 @@ StorageSystemDetachedParts::StorageSystemDetachedParts(const StorageID & table_i
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
Pipe StorageSystemDetachedParts::read(
|
||||
class ReadFromSystemDetachedParts : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
ReadFromSystemDetachedParts(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageSystemDetachedParts> storage_,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
size_t max_block_size_,
|
||||
size_t num_streams_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, storage(std::move(storage_))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
, num_streams(num_streams_)
|
||||
{}
|
||||
|
||||
std::string getName() const override { return "ReadFromSystemDetachedParts"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
protected:
|
||||
std::shared_ptr<StorageSystemDetachedParts> storage;
|
||||
std::vector<UInt8> columns_mask;
|
||||
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
const size_t max_block_size;
|
||||
const size_t num_streams;
|
||||
};
|
||||
|
||||
void ReadFromSystemDetachedParts::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
}
|
||||
|
||||
void StorageSystemDetachedParts::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -299,17 +347,28 @@ Pipe StorageSystemDetachedParts::read(
|
||||
|
||||
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
|
||||
auto state = std::make_shared<SourceState>(StoragesInfoStream(query_info, context));
|
||||
auto this_ptr = std::static_pointer_cast<StorageSystemDetachedParts>(shared_from_this());
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemDetachedParts>(
|
||||
column_names, query_info, storage_snapshot,
|
||||
std::move(context), std::move(header), std::move(this_ptr), std::move(columns_mask), max_block_size, num_streams);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromSystemDetachedParts::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
auto state = std::make_shared<SourceState>(StoragesInfoStream(predicate, context));
|
||||
|
||||
Pipe pipe;
|
||||
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size);
|
||||
auto source = std::make_shared<DetachedPartsSource>(getOutputStream().header, state, columns_mask, max_block_size);
|
||||
pipe.addSource(std::move(source));
|
||||
}
|
||||
|
||||
return pipe;
|
||||
pipeline.init(std::move(pipe));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,14 +20,15 @@ public:
|
||||
bool isSystemStorage() const override { return true; }
|
||||
|
||||
protected:
|
||||
Pipe read(
|
||||
const Names & /* column_names */,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
size_t /*num_streams*/) override;
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & /* column_names */,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
size_t /*num_streams*/) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ NamesAndTypesList StorageSystemDictionaries::getVirtuals() const
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & /*query_info*/) const
|
||||
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_dictionaries = access->isGranted(AccessType::SHOW_DICTIONARIES);
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
|
||||
class StorageSystemDictionaries final : public IStorageSystemOneBlock<StorageSystemDictionaries>
|
||||
class StorageSystemDictionaries final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemDictionaries"; }
|
||||
@ -21,7 +21,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ ColumnsDescription StorageSystemDistributionQueue::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const
|
||||
void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
@ -159,7 +159,7 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, Cont
|
||||
{ col_table_to_filter, std::make_shared<DataTypeString>(), "table" },
|
||||
};
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
|
||||
|
||||
if (!filtered_block.rows())
|
||||
return;
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/** Implements the `distribution_queue` system table, which allows you to view the INSERT queues for the Distributed tables.
|
||||
*/
|
||||
class StorageSystemDistributionQueue final : public IStorageSystemOneBlock<StorageSystemDistributionQueue>
|
||||
class StorageSystemDistributionQueue final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemDistributionQueue"; }
|
||||
@ -21,7 +21,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ ColumnsDescription StorageSystemDroppedTables::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemDroppedTables::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemDroppedTables::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped();
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemDroppedTables final : public IStorageSystemOneBlock<StorageSystemDroppedTables>
|
||||
class StorageSystemDroppedTables final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemMarkedDroppedTables"; }
|
||||
@ -14,7 +14,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ namespace DB
|
||||
{
|
||||
|
||||
|
||||
StoragesDroppedInfoStream::StoragesDroppedInfoStream(const SelectQueryInfo & query_info, ContextPtr context)
|
||||
StoragesDroppedInfoStream::StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
|
||||
: StoragesInfoStreamBase(context)
|
||||
{
|
||||
/// Will apply WHERE to subset of columns and then add more columns.
|
||||
@ -73,7 +73,7 @@ StoragesDroppedInfoStream::StoragesDroppedInfoStream(const SelectQueryInfo & que
|
||||
if (block_to_filter.rows())
|
||||
{
|
||||
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
|
||||
rows = block_to_filter.rows();
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
class StoragesDroppedInfoStream : public StoragesInfoStreamBase
|
||||
{
|
||||
public:
|
||||
StoragesDroppedInfoStream(const SelectQueryInfo & query_info, ContextPtr context);
|
||||
StoragesDroppedInfoStream(const ActionsDAG::Node * predicate, ContextPtr context);
|
||||
protected:
|
||||
bool tryLockTable(StoragesInfo &) override
|
||||
{
|
||||
@ -30,9 +30,9 @@ public:
|
||||
|
||||
std::string getName() const override { return "SystemDroppedTablesParts"; }
|
||||
protected:
|
||||
std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const SelectQueryInfo & query_info, ContextPtr context) override
|
||||
std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context) override
|
||||
{
|
||||
return std::make_unique<StoragesDroppedInfoStream>(query_info, context);
|
||||
return std::make_unique<StoragesDroppedInfoStream>(predicate, context);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -23,7 +23,7 @@ ColumnsDescription StorageSystemEnabledRoles::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemEnabledRoles::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemEnabledRoles::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto roles_info = context->getRolesInfo();
|
||||
auto user = context->getUser();
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// Implements `enabled_roles` system table, which allows you to get information about enabled roles.
|
||||
class StorageSystemEnabledRoles final : public IStorageSystemOneBlock<StorageSystemEnabledRoles>
|
||||
class StorageSystemEnabledRoles final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemEnabledRoles"; }
|
||||
@ -16,7 +16,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ ColumnsDescription StorageSystemErrors::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemErrors::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemErrors::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto add_row = [&](std::string_view name, size_t code, const auto & error, bool remote)
|
||||
{
|
||||
|
@ -13,7 +13,7 @@ class Context;
|
||||
* Implements the `errors` system table, which shows the error code and the number of times it happens
|
||||
* (i.e. Exception with this code had been thrown).
|
||||
*/
|
||||
class StorageSystemErrors final : public IStorageSystemOneBlock<StorageSystemErrors>
|
||||
class StorageSystemErrors final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemErrors"; }
|
||||
@ -23,7 +23,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ ColumnsDescription StorageSystemEvents::getColumnsDescription()
|
||||
return description;
|
||||
}
|
||||
|
||||
void StorageSystemEvents::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemEvents::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
|
||||
{
|
||||
|
@ -10,7 +10,7 @@ class Context;
|
||||
|
||||
/** Implements `events` system table, which allows you to obtain information for profiling.
|
||||
*/
|
||||
class StorageSystemEvents final : public IStorageSystemOneBlock<StorageSystemEvents>
|
||||
class StorageSystemEvents final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemEvents"; }
|
||||
@ -20,7 +20,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -36,11 +36,11 @@ ColumnsDescription StorageSystemFilesystemCache::getColumnsDescription()
|
||||
}
|
||||
|
||||
StorageSystemFilesystemCache::StorageSystemFilesystemCache(const StorageID & table_id_)
|
||||
: IStorageSystemOneBlock(table_id_)
|
||||
: IStorageSystemOneBlock(table_id_, getColumnsDescription())
|
||||
{
|
||||
}
|
||||
|
||||
void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto caches = FileCacheFactory::instance().getAll();
|
||||
|
||||
|
@ -29,7 +29,7 @@ namespace DB
|
||||
* FORMAT Vertical
|
||||
*/
|
||||
|
||||
class StorageSystemFilesystemCache final : public IStorageSystemOneBlock<StorageSystemFilesystemCache>
|
||||
class StorageSystemFilesystemCache final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
explicit StorageSystemFilesystemCache(const StorageID & table_id_);
|
||||
@ -39,7 +39,7 @@ public:
|
||||
static ColumnsDescription getColumnsDescription();
|
||||
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ ColumnsDescription StorageSystemFormats::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemFormats::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemFormats::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto & formats = FormatFactory::instance().getAllFormats();
|
||||
for (const auto & pair : formats)
|
||||
|
@ -4,10 +4,10 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class StorageSystemFormats final : public IStorageSystemOneBlock<StorageSystemFormats>
|
||||
class StorageSystemFormats final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -133,7 +133,7 @@ ColumnsDescription StorageSystemFunctions::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemFunctions::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemFunctions::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto & functions_factory = FunctionFactory::instance();
|
||||
const auto & function_names = functions_factory.getAllRegisteredNames();
|
||||
|
@ -12,7 +12,7 @@ class Context;
|
||||
/** Implements `functions`system table, which allows you to get a list
|
||||
* all normal and aggregate functions.
|
||||
*/
|
||||
class StorageSystemFunctions final : public IStorageSystemOneBlock<StorageSystemFunctions>
|
||||
class StorageSystemFunctions final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemFunctions"; }
|
||||
@ -25,7 +25,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ ColumnsDescription StorageSystemGrants::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemGrants::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemGrants::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
/// If "select_from_system_db_requires_grant" is enabled the access rights were already checked in InterpreterSelectQuery.
|
||||
const auto & access_control = context->getAccessControl();
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// Implements `grants` system table, which allows you to get information about grants.
|
||||
class StorageSystemGrants final : public IStorageSystemOneBlock<StorageSystemGrants>
|
||||
class StorageSystemGrants final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemGrants"; }
|
||||
@ -16,7 +16,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ static StorageSystemGraphite::Configs getConfigs(ContextPtr context)
|
||||
return graphite_configs;
|
||||
}
|
||||
|
||||
void StorageSystemGraphite::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemGraphite::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
Configs graphite_configs = getConfigs(context);
|
||||
|
||||
|
@ -10,7 +10,7 @@ namespace DB
|
||||
{
|
||||
|
||||
/// Provides information about Graphite configuration.
|
||||
class StorageSystemGraphite final : public IStorageSystemOneBlock<StorageSystemGraphite>
|
||||
class StorageSystemGraphite final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemGraphite"; }
|
||||
@ -30,7 +30,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ ColumnsDescription StorageSystemKafkaConsumers::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped();
|
||||
|
||||
|
@ -11,7 +11,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemKafkaConsumers final : public IStorageSystemOneBlock<StorageSystemKafkaConsumers>
|
||||
class StorageSystemKafkaConsumers final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemKafkaConsumers"; }
|
||||
@ -19,7 +19,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ ColumnsDescription StorageSystemLicenses::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemLicenses::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemLicenses::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (const auto * it = library_licenses; *it; it += 4)
|
||||
{
|
||||
|
@ -10,10 +10,10 @@ class Context;
|
||||
|
||||
/** System table "licenses" with list of licenses of 3rd party libraries
|
||||
*/
|
||||
class StorageSystemLicenses final : public IStorageSystemOneBlock<StorageSystemLicenses>
|
||||
class StorageSystemLicenses final : public IStorageSystemOneBlock
|
||||
{
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
|
@ -15,7 +15,7 @@ ColumnsDescription StorageSystemMacros::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemMacros::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemMacros::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto macros = context->getMacros();
|
||||
|
||||
|
@ -12,7 +12,7 @@ class Context;
|
||||
|
||||
/** Information about macros for introspection.
|
||||
*/
|
||||
class StorageSystemMacros final : public IStorageSystemOneBlock<StorageSystemMacros>
|
||||
class StorageSystemMacros final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemMacros"; }
|
||||
@ -22,7 +22,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ ColumnsDescription SystemMergeTreeSettings<replicated>::getColumnsDescription()
|
||||
}
|
||||
|
||||
template <bool replicated>
|
||||
void SystemMergeTreeSettings<replicated>::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void SystemMergeTreeSettings<replicated>::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto & settings = replicated ? context->getReplicatedMergeTreeSettings() : context->getMergeTreeSettings();
|
||||
auto constraints_and_current_profiles = context->getSettingsConstraintsAndCurrentProfiles();
|
||||
|
@ -14,7 +14,7 @@ class Context;
|
||||
* which allows to get information about the current MergeTree settings.
|
||||
*/
|
||||
template <bool replicated>
|
||||
class SystemMergeTreeSettings final : public IStorageSystemOneBlock<SystemMergeTreeSettings<replicated>>
|
||||
class SystemMergeTreeSettings final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return replicated ? "SystemReplicatedMergeTreeSettings" : "SystemMergeTreeSettings"; }
|
||||
@ -22,9 +22,9 @@ public:
|
||||
static ColumnsDescription getColumnsDescription();
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock<SystemMergeTreeSettings<replicated>>::IStorageSystemOneBlock;
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ ColumnsDescription StorageSystemMerges::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemMerges::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemMerges::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
|
||||
class StorageSystemMerges final : public IStorageSystemOneBlock<StorageSystemMerges>
|
||||
class StorageSystemMerges final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemMerges"; }
|
||||
@ -22,7 +22,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ ColumnsDescription StorageSystemMetrics::getColumnsDescription()
|
||||
return description;
|
||||
}
|
||||
|
||||
void StorageSystemMetrics::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemMetrics::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
|
||||
{
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/** Implements `metrics` system table, which provides information about the operation of the server.
|
||||
*/
|
||||
class StorageSystemMetrics final : public IStorageSystemOneBlock<StorageSystemMetrics>
|
||||
class StorageSystemMetrics final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemMetrics"; }
|
||||
@ -21,7 +21,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ ColumnsDescription StorageSystemModels::getColumnsDescription()
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemModels::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemModels::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
auto bridge_helper = std::make_unique<CatBoostLibraryBridgeHelper>(context);
|
||||
ExternalModelInfos infos = bridge_helper->listModels();
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
|
||||
class StorageSystemModels final : public IStorageSystemOneBlock<StorageSystemModels>
|
||||
class StorageSystemModels final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemModels"; }
|
||||
@ -19,7 +19,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ ColumnsDescription StorageSystemMoves::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemMoves::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemMoves::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
|
||||
class StorageSystemMoves final : public IStorageSystemOneBlock<StorageSystemMoves>
|
||||
class StorageSystemMoves final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemMoves"; }
|
||||
@ -22,7 +22,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ ColumnsDescription StorageSystemMutations::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const
|
||||
void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
@ -100,7 +100,7 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, ContextPtr c
|
||||
{ col_table, std::make_shared<DataTypeString>(), "table" },
|
||||
};
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
|
||||
|
||||
if (!filtered_block.rows())
|
||||
return;
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/// Implements the `mutations` system table, which provides information about the status of mutations
|
||||
/// in the MergeTree tables.
|
||||
class StorageSystemMutations final : public IStorageSystemOneBlock<StorageSystemMutations>
|
||||
class StorageSystemMutations final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
String getName() const override { return "SystemMutations"; }
|
||||
@ -21,7 +21,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -25,11 +25,11 @@ ColumnsDescription StorageSystemNamedCollections::getColumnsDescription()
|
||||
}
|
||||
|
||||
StorageSystemNamedCollections::StorageSystemNamedCollections(const StorageID & table_id_)
|
||||
: IStorageSystemOneBlock(table_id_)
|
||||
: IStorageSystemOneBlock(table_id_, getColumnsDescription())
|
||||
{
|
||||
}
|
||||
|
||||
void StorageSystemNamedCollections::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemNamedCollections::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
const auto & access = context->getAccess();
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemNamedCollections final : public IStorageSystemOneBlock<StorageSystemNamedCollections>
|
||||
class StorageSystemNamedCollections final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
explicit StorageSystemNamedCollections(const StorageID & table_id_);
|
||||
@ -15,7 +15,7 @@ public:
|
||||
static ColumnsDescription getColumnsDescription();
|
||||
|
||||
protected:
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ ColumnsDescription StorageSystemPartMovesBetweenShards::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const
|
||||
void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
@ -95,7 +95,7 @@ void StorageSystemPartMovesBetweenShards::fillData(MutableColumns & res_columns,
|
||||
{ col_table_to_filter, std::make_shared<DataTypeString>(), "table" },
|
||||
};
|
||||
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);
|
||||
|
||||
if (!filtered_block.rows())
|
||||
return;
|
||||
|
@ -9,7 +9,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
|
||||
class StorageSystemPartMovesBetweenShards final : public IStorageSystemOneBlock<StorageSystemPartMovesBetweenShards>
|
||||
class StorageSystemPartMovesBetweenShards final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemShardMoves"; }
|
||||
@ -19,7 +19,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,9 @@
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/System/StorageSystemPartsBase.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
@ -79,7 +82,7 @@ StoragesInfo::getProjectionParts(MergeTreeData::DataPartStateVector & state, boo
|
||||
return data->getProjectionPartsVectorForInternalUsage({State::Active}, &state);
|
||||
}
|
||||
|
||||
StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, ContextPtr context)
|
||||
StoragesInfoStream::StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
|
||||
: StoragesInfoStreamBase(context)
|
||||
{
|
||||
/// Will apply WHERE to subset of columns and then add more columns.
|
||||
@ -111,7 +114,7 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, Conte
|
||||
std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
|
||||
|
||||
/// Filter block_to_filter with column 'database'.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
|
||||
rows = block_to_filter.rows();
|
||||
|
||||
/// Block contains new columns, update database_column.
|
||||
@ -190,7 +193,7 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, Conte
|
||||
if (rows)
|
||||
{
|
||||
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block_to_filter, context);
|
||||
rows = block_to_filter.rows();
|
||||
}
|
||||
|
||||
@ -200,8 +203,61 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, Conte
|
||||
storage_uuid_column = block_to_filter.getByName("uuid").column;
|
||||
}
|
||||
|
||||
class ReadFromSystemPartsBase : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromSystemPartsBase"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
Pipe StorageSystemPartsBase::read(
|
||||
ReadFromSystemPartsBase(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageSystemPartsBase> storage_,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
bool has_state_column_);
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
protected:
|
||||
std::shared_ptr<StorageSystemPartsBase> storage;
|
||||
std::vector<UInt8> columns_mask;
|
||||
const bool has_state_column;
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
};
|
||||
|
||||
ReadFromSystemPartsBase::ReadFromSystemPartsBase(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageSystemPartsBase> storage_,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
bool has_state_column_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, storage(std::move(storage_))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, has_state_column(has_state_column_)
|
||||
{
|
||||
}
|
||||
|
||||
void ReadFromSystemPartsBase::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
}
|
||||
|
||||
void StorageSystemPartsBase::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -212,29 +268,39 @@ Pipe StorageSystemPartsBase::read(
|
||||
{
|
||||
bool has_state_column = hasStateColumn(column_names, storage_snapshot);
|
||||
|
||||
auto stream = getStoragesInfoStream(query_info, context);
|
||||
|
||||
/// Create the result.
|
||||
Block sample = storage_snapshot->metadata->getSampleBlock();
|
||||
|
||||
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample, column_names);
|
||||
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
if (has_state_column)
|
||||
res_columns.push_back(ColumnString::create());
|
||||
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
|
||||
|
||||
auto this_ptr = std::static_pointer_cast<StorageSystemPartsBase>(shared_from_this());
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemPartsBase>(
|
||||
column_names, query_info, storage_snapshot,
|
||||
std::move(context), std::move(header), std::move(this_ptr), std::move(columns_mask), has_state_column);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromSystemPartsBase::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
auto stream = storage->getStoragesInfoStream(predicate, context);
|
||||
auto header = getOutputStream().header;
|
||||
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
|
||||
while (StoragesInfo info = stream->next())
|
||||
{
|
||||
processNextStorage(context, res_columns, columns_mask, info, has_state_column);
|
||||
storage->processNextStorage(context, res_columns, columns_mask, info, has_state_column);
|
||||
}
|
||||
|
||||
if (has_state_column)
|
||||
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
Chunk chunk(std::move(res_columns), num_rows);
|
||||
|
||||
return Pipe(std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk)));
|
||||
pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk))));
|
||||
}
|
||||
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -114,7 +115,7 @@ protected:
|
||||
class StoragesInfoStream : public StoragesInfoStreamBase
|
||||
{
|
||||
public:
|
||||
StoragesInfoStream(const SelectQueryInfo & query_info, ContextPtr context);
|
||||
StoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context);
|
||||
};
|
||||
|
||||
/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
|
||||
@ -122,7 +123,8 @@ public:
|
||||
class StorageSystemPartsBase : public IStorage
|
||||
{
|
||||
public:
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -139,13 +141,15 @@ private:
|
||||
static bool hasStateColumn(const Names & column_names, const StorageSnapshotPtr & storage_snapshot);
|
||||
|
||||
protected:
|
||||
friend class ReadFromSystemPartsBase;
|
||||
|
||||
const FormatSettings format_settings = {};
|
||||
|
||||
StorageSystemPartsBase(const StorageID & table_id_, ColumnsDescription && columns);
|
||||
|
||||
virtual std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const SelectQueryInfo & query_info, ContextPtr context)
|
||||
virtual std::unique_ptr<StoragesInfoStreamBase> getStoragesInfoStream(const ActionsDAG::Node * predicate, ContextPtr context)
|
||||
{
|
||||
return std::make_unique<StoragesInfoStream>(query_info, context);
|
||||
return std::make_unique<StoragesInfoStream>(predicate, context);
|
||||
}
|
||||
|
||||
virtual void
|
||||
|
@ -79,7 +79,7 @@ ColumnsDescription StorageSystemPrivileges::getColumnsDescription()
|
||||
}
|
||||
|
||||
|
||||
void StorageSystemPrivileges::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
|
||||
void StorageSystemPrivileges::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
size_t column_index = 0;
|
||||
auto & column_access_type = assert_cast<ColumnInt16 &>(*res_columns[column_index++]).getData();
|
||||
|
@ -8,7 +8,7 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// Implements `privileges` system table, which allows you to get information about access types.
|
||||
class StorageSystemPrivileges final : public IStorageSystemOneBlock<StorageSystemPrivileges>
|
||||
class StorageSystemPrivileges final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemPrivileges"; }
|
||||
@ -17,7 +17,7 @@ public:
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ ColumnsDescription StorageSystemProcesses::getColumnsDescription()
|
||||
return description;
|
||||
}
|
||||
|
||||
void StorageSystemProcesses::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
void StorageSystemProcesses::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
|
||||
{
|
||||
ProcessList::Info info = context->getProcessList().getInfo(true, true, true);
|
||||
|
||||
|
@ -11,7 +11,7 @@ class Context;
|
||||
|
||||
/** Implements `processes` system table, which allows you to get information about the queries that are currently executing.
|
||||
*/
|
||||
class StorageSystemProcesses final : public IStorageSystemOneBlock<StorageSystemProcesses>
|
||||
class StorageSystemProcesses final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemProcesses"; }
|
||||
@ -21,7 +21,7 @@ public:
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user