Merge pull request #58353 from ClickHouse/hdfs-virtuals

Refactor StorageHDFS and StorageFile virtual columns filtering
This commit is contained in:
Nikolai Kochetov 2024-01-04 20:11:41 +01:00 committed by GitHub
commit 4ab6bcbd25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1019 additions and 451 deletions

View File

@ -18,31 +18,25 @@ protected:
void setKeyConditionImpl(const SelectQueryInfo & query_info, ContextPtr context, const Block & keys)
{
if (!context->getSettingsRef().allow_experimental_analyzer)
{
key_condition = std::make_shared<const KeyCondition>(
query_info,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
key_condition = std::make_shared<const KeyCondition>(
query_info,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
void setKeyConditionImpl(const ActionsDAG::NodeRawConstPtrs & nodes, ContextPtr context, const Block & keys)
{
if (context->getSettingsRef().allow_experimental_analyzer)
{
std::unordered_map<std::string, DB::ColumnWithTypeAndName> node_name_to_input_column;
for (const auto & column : keys.getColumnsWithTypeAndName())
node_name_to_input_column.insert({column.name, column});
std::unordered_map<std::string, DB::ColumnWithTypeAndName> node_name_to_input_column;
for (const auto & column : keys.getColumnsWithTypeAndName())
node_name_to_input_column.insert({column.name, column});
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(nodes, node_name_to_input_column, context);
key_condition = std::make_shared<const KeyCondition>(
filter_actions_dag,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(nodes, node_name_to_input_column, context);
key_condition = std::make_shared<const KeyCondition>(
filter_actions_dag,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
public:

View File

@ -15,6 +15,9 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressionMethod.h>
@ -408,22 +411,22 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
class HDFSSource::DisclosedGlobIterator::Impl
{
public:
Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
Impl(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
uris = getPathsList(path_from_uri, uri_without_path, context);
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, uris[0].path, context);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & path_with_info : uris)
paths.push_back(path_with_info.path);
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, context);
}
auto file_progress_callback = context->getFileProgressCallback();
@ -456,21 +459,21 @@ private:
class HDFSSource::URISIterator::Impl : WithContext
{
public:
explicit Impl(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
explicit Impl(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
: WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback())
{
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first, getContext());
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(getPathFromUriAndUriWithoutPath(uri).first);
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast);
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, getContext());
}
if (!uris.empty())
@ -517,16 +520,16 @@ private:
std::function<void(FileProgress)> file_progress_callback;
};
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, query, virtual_columns, context)) {}
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, predicate, virtual_columns, context)) {}
StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, query, virtual_columns, context))
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, predicate, virtual_columns, context))
{
}
@ -541,8 +544,7 @@ HDFSSource::HDFSSource(
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IteratorWrapper> file_iterator_,
bool need_only_count_,
const SelectQueryInfo & query_info_)
bool need_only_count_)
: ISource(info.source_header, false)
, WithContext(context_)
, storage(std::move(storage_))
@ -553,7 +555,6 @@ HDFSSource::HDFSSource(
, file_iterator(file_iterator_)
, columns_description(info.columns_description)
, need_only_count(need_only_count_)
, query_info(query_info_)
{
initialize();
}
@ -843,7 +844,57 @@ bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
}
Pipe StorageHDFS::read(
class ReadFromHDFS : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromHDFS"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromHDFS(
Block sample_block,
ReadFromFormatInfo info_,
bool need_only_count_,
std::shared_ptr<StorageHDFS> storage_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, info(std::move(info_))
, need_only_count(need_only_count_)
, storage(std::move(storage_))
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
private:
ReadFromFormatInfo info;
const bool need_only_count;
std::shared_ptr<StorageHDFS> storage;
ContextPtr context;
size_t max_block_size;
size_t num_streams;
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromHDFS::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void StorageHDFS::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -852,18 +903,40 @@ Pipe StorageHDFS::read(
size_t max_block_size,
size_t num_streams)
{
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper{nullptr};
if (distributed_processing)
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), virtual_columns);
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
auto reading = std::make_unique<ReadFromHDFS>(
read_from_format_info.source_header,
std::move(read_from_format_info),
need_only_count,
std::move(this_ptr),
context_,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
void ReadFromHDFS::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator_wrapper)
return;
if (storage->distributed_processing)
{
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>(
[callback = context_->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo {
[callback = context->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo {
return StorageHDFS::PathWithInfo{callback(), std::nullopt};
});
}
else if (is_path_with_globs)
else if (storage->is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], query_info.query, virtual_columns, context_);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(storage->uris[0], predicate, storage->virtual_columns, context);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
@ -871,31 +944,38 @@ Pipe StorageHDFS::read(
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, query_info.query, virtual_columns, context_);
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(storage->uris, predicate, storage->virtual_columns, context);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
});
}
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context_->getSettingsRef().optimize_count_from_files;
void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<HDFSSource>(
read_from_format_info,
this_ptr,
context_,
info,
storage,
context,
max_block_size,
iterator_wrapper,
need_only_count,
query_info));
need_only_count));
}
return Pipe::unitePipes(std::move(pipes));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)

View File

@ -51,7 +51,8 @@ public:
String getName() const override { return "HDFS"; }
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -93,6 +94,7 @@ public:
protected:
friend class HDFSSource;
friend class ReadFromHDFS;
private:
std::vector<String> uris;
@ -114,7 +116,7 @@ public:
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
DisclosedGlobIterator(const String & uri_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
@ -125,7 +127,7 @@ public:
class URISIterator
{
public:
URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
URISIterator(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
@ -142,8 +144,7 @@ public:
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IteratorWrapper> file_iterator_,
bool need_only_count_,
const SelectQueryInfo & query_info_);
bool need_only_count_);
String getName() const override;
@ -162,7 +163,6 @@ private:
ColumnsDescription columns_description;
bool need_only_count;
size_t total_rows_in_file = 0;
SelectQueryInfo query_info;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;

View File

@ -79,9 +79,9 @@ void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, query, virtual_columns, context);
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, predicate, virtual_columns, context);
auto callback = std::make_shared<std::function<String()>>([iter = std::move(iterator)]() mutable -> String { return iter->next().path; });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}

View File

@ -35,7 +35,7 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }

View File

@ -1,7 +1,7 @@
#include "Storages/IStorageCluster.h"
#include <Storages/IStorageCluster.h>
#include "Common/Exception.h"
#include "Core/QueryProcessingStage.h"
#include <Common/Exception.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
@ -11,11 +11,14 @@
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Parsers/queryToString.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
@ -38,9 +41,66 @@ IStorageCluster::IStorageCluster(
{
}
class ReadFromCluster : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromCluster"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromCluster(
Block sample_block,
std::shared_ptr<IStorageCluster> storage_,
ASTPtr query_to_send_,
QueryProcessingStage::Enum processed_stage_,
ClusterPtr cluster_,
Poco::Logger * log_,
ContextPtr context_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, storage(std::move(storage_))
, query_to_send(std::move(query_to_send_))
, processed_stage(processed_stage_)
, cluster(std::move(cluster_))
, log(log_)
, context(std::move(context_))
{
}
private:
std::shared_ptr<IStorageCluster> storage;
ASTPtr query_to_send;
QueryProcessingStage::Enum processed_stage;
ClusterPtr cluster;
Poco::Logger * log;
ContextPtr context;
std::optional<RemoteQueryExecutor::Extension> extension;
void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
};
void ReadFromCluster::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createExtension(predicate);
}
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
{
if (extension)
return;
extension = storage->getTaskIteratorExtension(predicate, context);
}
/// The code executes on initiator
Pipe IStorageCluster::read(
void IStorageCluster::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -49,10 +109,10 @@ Pipe IStorageCluster::read(
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
updateBeforeRead(context);
storage_snapshot->check(column_names);
updateBeforeRead(context);
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
@ -70,12 +130,6 @@ Pipe IStorageCluster::read(
query_to_send = interpreter.getQueryInfo().query->clone();
}
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
Pipes pipes;
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
if (!structure_argument_was_provided)
addColumnsStructureToQuery(query_to_send, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), context);
@ -89,7 +143,29 @@ Pipe IStorageCluster::read(
/* only_replace_in_join_= */true);
visitor.visit(query_to_send);
auto new_context = updateSettings(context, context->getSettingsRef());
auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this());
auto reading = std::make_unique<ReadFromCluster>(
sample_block,
std::move(this_ptr),
std::move(query_to_send),
processed_stage,
cluster,
log,
context);
query_plan.addStep(std::move(reading));
}
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createExtension(nullptr);
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
Pipes pipes;
auto new_context = updateSettings(context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
@ -100,7 +176,7 @@ Pipe IStorageCluster::read(
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
sample_block,
getOutputStream().header,
new_context,
/*throttler=*/nullptr,
scalars,
@ -113,8 +189,14 @@ Pipe IStorageCluster::read(
}
}
storage_snapshot->check(column_names);
return Pipe::unitePipes(std::move(pipes));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(getOutputStream().header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
@ -129,7 +211,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
return QueryProcessingStage::Enum::FetchColumns;
}
ContextPtr IStorageCluster::updateSettings(ContextPtr context, const Settings & settings)
ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
{
Settings new_settings = settings;

View File

@ -22,7 +22,8 @@ public:
Poco::Logger * log_,
bool structure_argument_was_provided_);
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -33,7 +34,7 @@ public:
ClusterPtr getCluster(ContextPtr context) const;
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const = 0;
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
@ -45,8 +46,6 @@ protected:
virtual void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) = 0;
private:
ContextPtr updateSettings(ContextPtr context, const Settings & settings);
Poco::Logger * log;
String cluster_name;
bool structure_argument_was_provided;

View File

@ -6,11 +6,14 @@
#include <IO/CompressionMethod.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Processors/Sources/NullSource.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
@ -20,6 +23,7 @@
#include <Storages/StorageSnapshot.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <filesystem>
@ -204,10 +208,65 @@ bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context_, format_settings);
}
Pipe StorageS3Queue::read(
class ReadFromS3Queue : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromS3Queue"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromS3Queue(
Block sample_block,
ReadFromFormatInfo info_,
std::shared_ptr<StorageS3Queue> storage_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, info(std::move(info_))
, storage(std::move(storage_))
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
private:
ReadFromFormatInfo info;
std::shared_ptr<StorageS3Queue> storage;
ContextPtr context;
size_t max_block_size;
size_t num_streams;
std::shared_ptr<StorageS3Queue::FileIterator> iterator;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromS3Queue::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator)
return;
iterator = storage->createFileIterator(context, predicate);
}
void ReadFromS3Queue::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void StorageS3Queue::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
SelectQueryInfo & /*query_info*/,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
@ -225,27 +284,49 @@ Pipe StorageS3Queue::read(
"Cannot read from {} with attached materialized views", getName());
}
Pipes pipes;
const size_t adjusted_num_streams = std::min<size_t>(num_streams, s3queue_settings->s3queue_processing_threads_num);
auto this_ptr = std::static_pointer_cast<StorageS3Queue>(shared_from_this());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
auto file_iterator = createFileIterator(local_context, query_info.query);
auto reading = std::make_unique<ReadFromS3Queue>(
read_from_format_info.source_header,
read_from_format_info,
std::move(this_ptr),
local_context,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const size_t adjusted_num_streams = std::min<size_t>(num_streams, storage->s3queue_settings->s3queue_processing_threads_num);
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(createSource(file_iterator, column_names, storage_snapshot, max_block_size, local_context));
return Pipe::unitePipes(std::move(pipes));
pipes.emplace_back(storage->createSource(info, iterator, max_block_size, context));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
size_t max_block_size,
ContextPtr local_context)
{
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
auto internal_source = std::make_unique<StorageS3Source>(
read_from_format_info, configuration.format, getName(), local_context, format_settings,
info, configuration.format, getName(), local_context, format_settings,
max_block_size,
configuration_snapshot.request_settings,
configuration_snapshot.compression_method,
@ -253,7 +334,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
configuration_snapshot.url.bucket,
configuration_snapshot.url.version_id,
configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()),
file_iterator, local_context->getSettingsRef().max_download_threads, false, /* query_info */ std::nullopt);
file_iterator, local_context->getSettingsRef().max_download_threads, false);
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client, blob_storage_log = BlobStorageLogWriter::create()](const std::string & path) mutable
{
@ -277,8 +358,8 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
};
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
return std::make_shared<StorageS3QueueSource>(
getName(), read_from_format_info.source_header, std::move(internal_source),
files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns,
getName(), info.source_header, std::move(internal_source),
files_metadata, after_processing, file_deleter, info.requested_virtual_columns,
local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log);
}
@ -375,13 +456,14 @@ bool StorageS3Queue::streamToViews()
auto block_io = interpreter.execute();
auto file_iterator = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(block_io.pipeline.getHeader().getNames(), storage_snapshot, supportsSubsetOfColumns(s3queue_context), getVirtuals());
Pipes pipes;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(
file_iterator, block_io.pipeline.getHeader().getNames(),
storage_snapshot, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
pipes.emplace_back(std::move(source));
}
@ -479,10 +561,10 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
}
}
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
{
auto glob_iterator = std::make_unique<StorageS3QueueSource::GlobIterator>(
*configuration.client, configuration.url, query, virtual_columns, local_context,
*configuration.client, configuration.url, predicate, virtual_columns, local_context,
/* read_keys */nullptr, configuration.request_settings);
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called);
}

View File

@ -39,10 +39,11 @@ public:
String getName() const override { return "S3Queue"; }
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
@ -57,6 +58,7 @@ public:
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
friend class ReadFromS3Queue;
using FileIterator = StorageS3QueueSource::FileIterator;
const std::unique_ptr<S3QueueSettings> s3queue_settings;
@ -85,11 +87,10 @@ private:
bool supportsSubsetOfColumns(const ContextPtr & context_) const;
bool supportsSubcolumns() const override { return true; }
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, ASTPtr query);
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate);
std::shared_ptr<StorageS3QueueSource> createSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
size_t max_block_size,
ContextPtr local_context);

View File

@ -1,6 +1,5 @@
#include <Storages/StorageAzureBlob.h>
#if USE_AZURE_BLOB_STORAGE
#include <Formats/FormatFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -21,6 +20,9 @@
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageSnapshot.h>
@ -666,7 +668,58 @@ private:
}
Pipe StorageAzureBlob::read(
class ReadFromAzureBlob : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromAzureBlob"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromAzureBlob(
Block sample_block,
std::shared_ptr<StorageAzureBlob> storage_,
ReadFromFormatInfo info_,
const bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, storage(std::move(storage_))
, info(std::move(info_))
, need_only_count(need_only_count_)
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
private:
std::shared_ptr<StorageAzureBlob> storage;
ReadFromFormatInfo info;
const bool need_only_count;
ContextPtr context;
size_t max_block_size;
const size_t num_streams;
std::shared_ptr<StorageAzureBlobSource::IIterator> iterator_wrapper;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromAzureBlob::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void StorageAzureBlob::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -678,51 +731,83 @@ Pipe StorageAzureBlob::read(
if (partition_by && configuration.withWildcard())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned Azure storage is not implemented yet");
Pipes pipes;
std::shared_ptr<StorageAzureBlobSource::IIterator> iterator_wrapper;
if (distributed_processing)
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::ReadIterator>(local_context,
local_context->getReadTaskCallback());
}
else if (configuration.withGlobs())
{
/// Iterate through disclosed globs and make a source for each file
iterator_wrapper = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage.get(), configuration.container, configuration.blob_path,
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}
else
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::KeysIterator>(
object_storage.get(), configuration.container, configuration.blobs_paths,
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}
auto this_ptr = std::static_pointer_cast<StorageAzureBlob>(shared_from_this());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
auto reading = std::make_unique<ReadFromAzureBlob>(
read_from_format_info.source_header,
std::move(this_ptr),
std::move(read_from_format_info),
need_only_count,
local_context,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
void ReadFromAzureBlob::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator_wrapper)
return;
const auto & configuration = storage->configuration;
if (storage->distributed_processing)
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::ReadIterator>(context,
context->getReadTaskCallback());
}
else if (configuration.withGlobs())
{
/// Iterate through disclosed globs and make a source for each file
iterator_wrapper = std::make_shared<StorageAzureBlobSource::GlobIterator>(
storage->object_storage.get(), configuration.container, configuration.blob_path,
predicate, storage->virtual_columns, context, nullptr, context->getFileProgressCallback());
}
else
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::KeysIterator>(
storage->object_storage.get(), configuration.container, configuration.blobs_paths,
predicate, storage->virtual_columns, context, nullptr, context->getFileProgressCallback());
}
}
void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
const auto & configuration = storage->configuration;
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageAzureBlobSource>(
read_from_format_info,
info,
configuration.format,
getName(),
local_context,
format_settings,
context,
storage->format_settings,
max_block_size,
configuration.compression_method,
object_storage.get(),
storage->object_storage.get(),
configuration.container,
configuration.connection_url,
iterator_wrapper,
need_only_count,
query_info));
need_only_count));
}
return Pipe::unitePipes(std::move(pipes));
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
@ -829,7 +914,7 @@ StorageAzureBlobSource::GlobIterator::GlobIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
String blob_path_with_globs_,
ASTPtr query_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_,
@ -838,7 +923,6 @@ StorageAzureBlobSource::GlobIterator::GlobIterator(
, object_storage(object_storage_)
, container(container_)
, blob_path_with_globs(blob_path_with_globs_)
, query(query_)
, virtual_columns(virtual_columns_)
, outer_blobs(outer_blobs_)
, file_progress_callback(file_progress_callback_)
@ -870,6 +954,8 @@ StorageAzureBlobSource::GlobIterator::GlobIterator(
ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", blob_path_with_globs, matcher->error());
recursive = blob_path_with_globs == "/**" ? true : false;
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
}
RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
@ -909,20 +995,15 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
}
index = 0;
if (!is_initialized)
{
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(container) / new_batch.front().relative_path, getContext());
is_initialized = true;
}
if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(new_batch.size());
for (auto & path_with_metadata : new_batch)
paths.push_back(fs::path(container) / path_with_metadata.relative_path);
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, query, virtual_columns, getContext(), filter_ast);
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_dag, virtual_columns, getContext());
}
if (outer_blobs)
@ -948,7 +1029,7 @@ StorageAzureBlobSource::KeysIterator::KeysIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
const Strings & keys_,
ASTPtr query_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs,
@ -956,23 +1037,22 @@ StorageAzureBlobSource::KeysIterator::KeysIterator(
: IIterator(context_)
, object_storage(object_storage_)
, container(container_)
, query(query_)
, virtual_columns(virtual_columns_)
{
Strings all_keys = keys_;
ASTPtr filter_ast;
if (!all_keys.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(container) / all_keys[0], getContext());
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_ast)
if (filter_dag)
{
Strings paths;
paths.reserve(all_keys.size());
for (const auto & key : all_keys)
paths.push_back(fs::path(container) / key);
VirtualColumnUtils::filterByPathOrFile(all_keys, paths, query, virtual_columns, getContext(), filter_ast);
VirtualColumnUtils::filterByPathOrFile(all_keys, paths, filter_dag, virtual_columns, getContext());
}
for (auto && key : all_keys)
@ -1078,8 +1158,7 @@ StorageAzureBlobSource::StorageAzureBlobSource(
const String & container_,
const String & connection_url_,
std::shared_ptr<IIterator> file_iterator_,
bool need_only_count_,
const SelectQueryInfo & query_info_)
bool need_only_count_)
:ISource(info.source_header, false)
, WithContext(context_)
, requested_columns(info.requested_columns)
@ -1096,7 +1175,6 @@ StorageAzureBlobSource::StorageAzureBlobSource(
, connection_url(connection_url_)
, file_iterator(file_iterator_)
, need_only_count(need_only_count_)
, query_info(query_info_)
, create_reader_pool(CurrentMetrics::ObjectStorageAzureThreads, CurrentMetrics::ObjectStorageAzureThreadsActive, CurrentMetrics::ObjectStorageAzureThreadsScheduled, 1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "AzureReader"))
{

View File

@ -88,7 +88,8 @@ public:
return name;
}
Pipe read(
void read(
QueryPlan & query_plan,
const Names &,
const StorageSnapshotPtr &,
SelectQueryInfo &,
@ -126,6 +127,8 @@ public:
bool distributed_processing = false);
private:
friend class ReadFromAzureBlob;
std::string name;
Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
@ -156,7 +159,7 @@ public:
AzureObjectStorage * object_storage_,
const std::string & container_,
String blob_path_with_globs_,
ASTPtr query_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_,
@ -169,8 +172,7 @@ public:
AzureObjectStorage * object_storage;
std::string container;
String blob_path_with_globs;
ASTPtr query;
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
NamesAndTypesList virtual_columns;
size_t index = 0;
@ -184,7 +186,6 @@ public:
void createFilterAST(const String & any_key);
bool is_finished = false;
bool is_initialized = false;
std::mutex next_mutex;
std::function<void(FileProgress)> file_progress_callback;
@ -212,7 +213,7 @@ public:
AzureObjectStorage * object_storage_,
const std::string & container_,
const Strings & keys_,
ASTPtr query_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs,
@ -226,7 +227,7 @@ public:
std::string container;
RelativePathsWithMetadata keys;
ASTPtr query;
ActionsDAGPtr filter_dag;
NamesAndTypesList virtual_columns;
std::atomic<size_t> index = 0;
@ -244,8 +245,7 @@ public:
const String & container_,
const String & connection_url_,
std::shared_ptr<IIterator> file_iterator_,
bool need_only_count_,
const SelectQueryInfo & query_info_);
bool need_only_count_);
~StorageAzureBlobSource() override;
Chunk generate() override;
@ -271,7 +271,6 @@ private:
std::shared_ptr<IIterator> file_iterator;
bool need_only_count;
size_t total_rows_in_file = 0;
SelectQueryInfo query_info;
struct ReaderHolder
{

View File

@ -69,11 +69,11 @@ void StorageAzureBlobCluster::addColumnsStructureToQuery(ASTPtr & query, const S
TableFunctionAzureBlobStorageCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
RemoteQueryExecutor::Extension StorageAzureBlobCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageAzureBlobCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage.get(), configuration.container, configuration.blob_path,
query, virtual_columns, context, nullptr);
predicate, virtual_columns, context, nullptr);
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String{ return iterator->next().relative_path; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}

View File

@ -34,7 +34,7 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }

View File

@ -91,6 +91,7 @@
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sinks/EmptySink.h>
@ -1068,15 +1069,67 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
return pipeline;
}
static ActionsDAGPtr getFilterFromQuery(const ASTPtr & ast, ContextPtr context)
{
QueryPlan plan;
SelectQueryOptions options;
options.only_analyze = true;
if (context->getSettingsRef().allow_experimental_analyzer)
{
InterpreterSelectQueryAnalyzer interpreter(ast, context, options);
plan = std::move(interpreter).extractQueryPlan();
}
else
{
InterpreterSelectWithUnionQuery interpreter(ast, context, options);
interpreter.buildQueryPlan(plan);
}
plan.optimize(QueryPlanOptimizationSettings::fromContext(context));
std::stack<QueryPlan::Node *> nodes;
nodes.push(plan.getRootNode());
SourceStepWithFilter * source = nullptr;
while (!nodes.empty())
{
const auto * node = nodes.top();
nodes.pop();
if (auto * with_filter = dynamic_cast<SourceStepWithFilter *>(node->step.get()))
{
if (source)
{
WriteBufferFromOwnString buf;
plan.explainPlan(buf, {});
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Found multiple source steps for query\n{}\nPlan\n{}",
queryToString(ast), buf.str());
}
source = with_filter;
}
}
if (!source)
return nullptr;
return ActionsDAG::buildFilterActionsDAG(source->getFilterNodes().nodes, {}, context);
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
auto filter = getFilterFromQuery(query.select, local_context);
const ActionsDAG::Node * predicate = nullptr;
if (filter)
predicate = filter->getOutputs().at(0);
/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(
select.list_of_selects->children.at(0)->as<ASTSelectQuery>()->clone(),
local_context);
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context);
auto dst_cluster = getCluster();

View File

@ -9,6 +9,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier_fwd.h>
@ -37,6 +38,8 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
@ -929,22 +932,21 @@ static std::chrono::seconds getLockTimeout(ContextPtr context)
using StorageFilePtr = std::shared_ptr<StorageFile>;
StorageFileSource::FilesIterator::FilesIterator(
const Strings & files_,
std::optional<StorageFile::ArchiveInfo> archive_info_,
ASTPtr query,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ContextPtr context_,
bool distributed_processing_)
: files(files_), archive_info(std::move(archive_info_)), distributed_processing(distributed_processing_), context(context_)
{
ASTPtr filter_ast;
if (!distributed_processing && !archive_info && !files.empty() && !files[0].empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, files[0], context_);
ActionsDAGPtr filter_dag;
if (!distributed_processing && !archive_info && !files.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_ast)
VirtualColumnUtils::filterByPathOrFile(files, files, query, virtual_columns, context_, filter_ast);
if (filter_dag)
VirtualColumnUtils::filterByPathOrFile(files, files, filter_dag, virtual_columns, context_);
}
String StorageFileSource::FilesIterator::next()
@ -974,16 +976,13 @@ const String & StorageFileSource::FilesIterator::getFileNameInArchive()
StorageFileSource::StorageFileSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageFile> storage_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
const SelectQueryInfo & query_info_,
UInt64 max_block_size_,
FilesIteratorPtr files_iterator_,
std::unique_ptr<ReadBuffer> read_buf_,
bool need_only_count_)
: SourceWithKeyCondition(info.source_header, false)
, storage(std::move(storage_))
, storage_snapshot(storage_snapshot_)
, files_iterator(std::move(files_iterator_))
, read_buf(std::move(read_buf_))
, columns_description(info.columns_description)
@ -991,7 +990,6 @@ StorageFileSource::StorageFileSource(
, requested_virtual_columns(info.requested_virtual_columns)
, block_for_format(info.format_header)
, context(context_)
, query_info(query_info_)
, max_block_size(max_block_size_)
, need_only_count(need_only_count_)
{
@ -1322,14 +1320,64 @@ std::optional<size_t> StorageFileSource::tryGetNumRowsFromCache(const String & p
return schema_cache.tryGetNumRows(key, get_last_mod_time);
}
Pipe StorageFile::read(
class ReadFromFile : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromFile"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromFile(
Block sample_block,
std::shared_ptr<StorageFile> storage_,
ReadFromFormatInfo info_,
const bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, storage(std::move(storage_))
, info(std::move(info_))
, need_only_count(need_only_count_)
, context(std::move(context_))
, max_block_size(max_block_size_)
, max_num_streams(num_streams_)
{
}
private:
std::shared_ptr<StorageFile> storage;
ReadFromFormatInfo info;
const bool need_only_count;
ContextPtr context;
size_t max_block_size;
const size_t max_num_streams;
std::shared_ptr<StorageFileSource::FilesIterator> files_iterator;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromFile::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void StorageFile::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
const size_t max_num_streams)
size_t num_streams)
{
if (use_table_fd)
{
@ -1346,24 +1394,58 @@ Pipe StorageFile::read(
if (p->size() == 1 && !fs::exists(p->at(0)))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
return Pipe(std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
else
if (!context->getSettingsRef().engine_file_empty_if_not_exists)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", p->at(0));
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
return;
}
}
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, archive_info, query_info.query, virtual_columns, context, distributed_processing);
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context->getSettingsRef().optimize_count_from_files;
auto reading = std::make_unique<ReadFromFile>(
read_from_format_info.source_header,
std::move(this_ptr),
std::move(read_from_format_info),
need_only_count,
context,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
void ReadFromFile::createIterator(const ActionsDAG::Node * predicate)
{
if (files_iterator)
return;
files_iterator = std::make_shared<StorageFileSource::FilesIterator>(
storage->paths,
storage->archive_info,
predicate,
storage->virtual_columns,
context,
storage->distributed_processing);
}
void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
size_t num_streams = max_num_streams;
size_t files_to_read = 0;
if (archive_info)
files_to_read = archive_info->paths_to_archives.size();
if (storage->archive_info)
files_to_read = storage->archive_info->paths_to_archives.size();
else
files_to_read = paths.size();
files_to_read = storage->paths.size();
if (max_num_streams > files_to_read)
num_streams = files_to_read;
@ -1374,12 +1456,8 @@ Pipe StorageFile::read(
/// Set total number of bytes to process. For progress bar.
auto progress_callback = context->getFileProgressCallback();
if (progress_callback && !archive_info)
progress_callback(FileProgress(0, total_bytes_to_read));
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& context->getSettingsRef().optimize_count_from_files;
if (progress_callback && !storage->archive_info)
progress_callback(FileProgress(0, storage->total_bytes_to_read));
for (size_t i = 0; i < num_streams; ++i)
{
@ -1388,22 +1466,35 @@ Pipe StorageFile::read(
/// If yes, then we should use it in StorageFileSource. Atomic bool flag is needed
/// to prevent data race in case of parallel reads.
std::unique_ptr<ReadBuffer> read_buffer;
if (has_peekable_read_buffer_from_fd.exchange(false))
read_buffer = std::move(peekable_read_buffer_from_fd);
if (storage->has_peekable_read_buffer_from_fd.exchange(false))
read_buffer = std::move(storage->peekable_read_buffer_from_fd);
pipes.emplace_back(std::make_shared<StorageFileSource>(
read_from_format_info,
this_ptr,
storage_snapshot,
auto source = std::make_shared<StorageFileSource>(
info,
storage,
context,
query_info,
max_block_size,
files_iterator,
std::move(read_buffer),
need_only_count));
need_only_count);
source->setKeyCondition(filter_nodes.nodes, context);
pipes.emplace_back(std::move(source));
}
return Pipe::unitePipes(std::move(pipes));
auto pipe = Pipe::unitePipes(std::move(pipes));
size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages;
if (parallelize_output && storage->parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < max_num_streams)
pipe.resize(max_num_streams);
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}

View File

@ -53,7 +53,8 @@ public:
std::string getName() const override { return "File"; }
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -137,6 +138,7 @@ public:
protected:
friend class StorageFileSource;
friend class StorageFileSink;
friend class ReadFromFile;
private:
void setStorageMetadata(CommonArguments args);
@ -194,7 +196,7 @@ public:
explicit FilesIterator(
const Strings & files_,
std::optional<StorageFile::ArchiveInfo> archive_info_,
ASTPtr query,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ContextPtr context_,
bool distributed_processing_ = false);
@ -234,9 +236,7 @@ private:
StorageFileSource(
const ReadFromFormatInfo & info,
std::shared_ptr<StorageFile> storage_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
const SelectQueryInfo & query_info_,
UInt64 max_block_size_,
FilesIteratorPtr files_iterator_,
std::unique_ptr<ReadBuffer> read_buf_,
@ -269,7 +269,6 @@ private:
std::optional<size_t> tryGetNumRowsFromCache(const String & path, time_t last_mod_time) const;
std::shared_ptr<StorageFile> storage;
StorageSnapshotPtr storage_snapshot;
FilesIteratorPtr files_iterator;
String current_path;
std::optional<size_t> current_file_size;
@ -290,7 +289,6 @@ private:
Block block_for_format;
ContextPtr context; /// TODO Untangle potential issues with context lifetime.
SelectQueryInfo query_info;
UInt64 max_block_size;
bool finished_generate = false;

View File

@ -71,9 +71,9 @@ void StorageFileCluster::addColumnsStructureToQuery(ASTPtr & query, const String
TableFunctionFileCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, query, virtual_columns, context);
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, virtual_columns, context);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}

View File

@ -31,7 +31,7 @@ public:
NamesAndTypesList getVirtuals() const override { return virtual_columns; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }

View File

@ -1,6 +1,4 @@
#include "config.h"
#include <Common/ProfileEvents.h>
#include "Parsers/ASTCreateQuery.h"
#if USE_AWS_S3
@ -16,6 +14,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
@ -42,6 +41,7 @@
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
@ -57,6 +57,7 @@
#include <Common/parseGlobs.h>
#include <Common/quoteString.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Processors/ISource.h>
#include <Processors/Sinks/SinkToStorage.h>
@ -146,7 +147,8 @@ public:
const Names & column_names_,
StorageSnapshotPtr storage_snapshot_,
StorageS3 & storage_,
SelectQueryInfo query_info_,
ReadFromFormatInfo read_from_format_info_,
bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
@ -154,106 +156,36 @@ public:
, column_names(column_names_)
, storage_snapshot(std::move(storage_snapshot_))
, storage(storage_)
, query_info(std::move(query_info_))
, read_from_format_info(std::move(read_from_format_info_))
, need_only_count(need_only_count_)
, local_context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
query_configuration = storage.updateConfigurationAndGetCopy(local_context);
virtual_columns = storage.getVirtuals();
}
private:
Names column_names;
StorageSnapshotPtr storage_snapshot;
StorageS3 & storage;
SelectQueryInfo query_info;
ReadFromFormatInfo read_from_format_info;
bool need_only_count;
StorageS3::Configuration query_configuration;
NamesAndTypesList virtual_columns;
ContextPtr local_context;
size_t max_block_size;
size_t num_streams;
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper;
void createIterator(const ActionsDAG::Node * predicate);
};
static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const std::unordered_set<String> & keys)
{
Block virtual_columns_block;
fs::path bucket_path(bucket);
for (const auto & [column_name, column_type] : virtual_columns)
{
if (column_name == "_path")
{
auto column = column_type->createColumn();
for (const auto & key : keys)
column->insert((bucket_path / key).string());
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
else if (column_name == "_file")
{
auto column = column_type->createColumn();
for (const auto & key : keys)
{
auto pos = key.find_last_of('/');
if (pos != std::string::npos)
column->insert(key.substr(pos + 1));
else
column->insert(key);
}
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
else if (column_name == "_key")
{
auto column = column_type->createColumn();
for (const auto & key : keys)
column->insert(key);
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
else
{
auto column = column_type->createColumn();
column->insertManyDefaults(keys.size());
virtual_columns_block.insert({std::move(column), column_type, column_name});
}
}
/// Column _key is mandatory and may not be in virtual_columns list
if (!virtual_columns_block.has("_key"))
{
auto column_type = std::make_shared<DataTypeString>();
auto column = column_type->createColumn(); for (const auto & key : keys)
column->insert(key);
virtual_columns_block.insert({std::move(column), column_type, "_key"});
}
return virtual_columns_block;
}
static std::vector<String> filterKeysForPartitionPruning(
const std::vector<String> & keys,
const String & bucket,
const NamesAndTypesList & virtual_columns,
const std::vector<ActionsDAGPtr> & filter_dags,
ContextPtr context)
{
std::unordered_set<String> result_keys(keys.begin(), keys.end());
for (const auto & filter_dag : filter_dags)
{
if (result_keys.empty())
break;
auto block = getBlockWithVirtuals(virtual_columns, bucket, result_keys);
auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_dag->getOutputs().at(0), block);
if (!filter_actions)
continue;
VirtualColumnUtils::filterBlockWithDAG(filter_actions, block, context);
result_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key");
}
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", result_keys.size(), keys.size());
return std::vector<String>(result_keys.begin(), result_keys.end());
}
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -263,7 +195,7 @@ public:
Impl(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr & query_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
@ -272,7 +204,6 @@ public:
: WithContext(context_)
, client(client_.clone())
, globbed_uri(globbed_uri_)
, query(query_)
, virtual_columns(virtual_columns_)
, read_keys(read_keys_)
, request_settings(request_settings_)
@ -306,6 +237,8 @@ public:
"Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error());
recursive = globbed_uri.key == "/**" ? true : false;
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
fillInternalBufferAssumeLocked();
}
@ -424,20 +357,14 @@ private:
return;
}
if (!is_initialized)
{
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext());
is_initialized = true;
}
if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(temp_buffer.size());
for (const auto & key_with_info : temp_buffer)
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info->key);
VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, query, virtual_columns, getContext(), filter_ast);
VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, filter_dag, virtual_columns, getContext());
}
buffer = std::move(temp_buffer);
@ -479,8 +406,7 @@ private:
S3::URI globbed_uri;
ASTPtr query;
NamesAndTypesList virtual_columns;
bool is_initialized{false};
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
std::unique_ptr<re2::RE2> matcher;
bool recursive{false};
bool is_finished{false};
@ -498,13 +424,13 @@ private:
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context,
KeysWithInfo * read_keys_,
const S3Settings::RequestSettings & request_settings_,
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_))
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, predicate, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_))
{
}
@ -646,8 +572,7 @@ StorageS3Source::StorageS3Source(
const String & url_host_and_port_,
std::shared_ptr<IIterator> file_iterator_,
const size_t max_parsing_threads_,
bool need_only_count_,
std::optional<SelectQueryInfo> query_info_)
bool need_only_count_)
: SourceWithKeyCondition(info.source_header, false)
, WithContext(context_)
, name(std::move(name_))
@ -663,7 +588,6 @@ StorageS3Source::StorageS3Source(
, client(client_)
, sample_block(info.format_header)
, format_settings(format_settings_)
, query_info(std::move(query_info_))
, requested_virtual_columns(info.requested_virtual_columns)
, file_iterator(file_iterator_)
, max_parsing_threads(max_parsing_threads_)
@ -1151,8 +1075,7 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const StorageS3::Configuration & configuration,
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const std::vector<ActionsDAGPtr> & filter_dags,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
StorageS3::KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> file_progress_callback = {})
@ -1165,12 +1088,22 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
{
/// Iterate through disclosed globs and make a source for each file
return std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*configuration.client, configuration.url, query, virtual_columns,
*configuration.client, configuration.url, predicate, virtual_columns,
local_context, read_keys, configuration.request_settings, file_progress_callback);
}
else
{
Strings keys = filterKeysForPartitionPruning(configuration.keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
Strings keys = configuration.keys;
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(configuration.url.bucket) / key);
VirtualColumnUtils::filterByPathOrFile(keys, paths, filter_dag, virtual_columns, local_context);
}
return std::make_shared<StorageS3Source::KeysIterator>(
*configuration.client, configuration.url.version_id, keys,
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
@ -1204,12 +1137,16 @@ void StorageS3::read(
{
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), virtual_columns);
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
auto reading = std::make_unique<ReadFromStorageS3Step>(
read_from_format_info.source_header,
column_names,
storage_snapshot,
*this,
query_info,
std::move(read_from_format_info),
need_only_count,
local_context,
max_block_size,
num_streams);
@ -1217,19 +1154,32 @@ void StorageS3::read(
query_plan.addStep(std::move(reading));
}
void ReadFromStorageS3Step::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, local_context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void ReadFromStorageS3Step::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator_wrapper)
return;
iterator_wrapper = createFileIterator(
query_configuration, storage.distributed_processing, local_context, predicate,
virtual_columns, nullptr, local_context->getFileProgressCallback());
}
void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto query_configuration = storage.updateConfigurationAndGetCopy(local_context);
if (storage.partition_by && query_configuration.withWildcard())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
auto virtual_columns = storage.getVirtuals();
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, storage.supportsSubsetOfColumns(local_context), virtual_columns);
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, storage.distributed_processing, local_context, query_info.query, filter_dags,
virtual_columns, nullptr, local_context->getFileProgressCallback());
createIterator(nullptr);
size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();
if (estimated_keys_count > 1)
@ -1238,9 +1188,6 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
num_streams = 1;
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul));
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads);
@ -1249,7 +1196,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
auto source = std::make_shared<StorageS3Source>(
read_from_format_info,
query_configuration.format,
storage.getName(),
@ -1264,17 +1211,20 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
iterator_wrapper,
max_parsing_threads,
need_only_count,
query_info));
need_only_count);
source->setKeyCondition(filter_nodes.nodes, local_context);
pipes.emplace_back(std::move(source));
}
pipeline.init(Pipe::unitePipes(std::move(pipes)));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(read_from_format_info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
void ReadFromStorageS3Step::applyFilters()
{
/// We will use filter_dags in filterKeysForPartitionPruning called from initializePipeline, nothing to do here
pipeline.init(std::move(pipe));
}
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
@ -1858,7 +1808,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
{
KeysWithInfo read_keys;
auto file_iterator = createFileIterator(configuration, false, ctx, {}, {}, {}, &read_keys);
auto file_iterator = createFileIterator(configuration, false, ctx, {}, {}, &read_keys);
ReadBufferIterator read_buffer_iterator(file_iterator, read_keys, configuration, format_settings, ctx);
return readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx);

View File

@ -78,7 +78,7 @@ public:
DisclosedGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys_ = nullptr,
@ -145,8 +145,7 @@ public:
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
size_t max_parsing_threads,
bool need_only_count_,
std::optional<SelectQueryInfo> query_info);
bool need_only_count_);
~StorageS3Source() override;
@ -180,7 +179,6 @@ private:
std::shared_ptr<const S3::Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
std::optional<SelectQueryInfo> query_info;
struct ReaderHolder
{

View File

@ -78,10 +78,10 @@ void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context)
s3_configuration.update(local_context);
}
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, query, virtual_columns, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
*s3_configuration.client, s3_configuration.url, predicate, virtual_columns, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
{

View File

@ -34,7 +34,7 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }

View File

@ -26,6 +26,8 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Common/ThreadStatus.h>
#include <Common/parseRemoteDescription.h>
@ -182,22 +184,22 @@ namespace
class StorageURLSource::DisclosedGlobIterator::Impl
{
public:
Impl(const String & uri_, size_t max_addresses, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
Impl(const String & uri_, size_t max_addresses, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
uris = parseRemoteDescription(uri_, 0, uri_.size(), ',', max_addresses);
ASTPtr filter_ast;
ActionsDAGPtr filter_dag;
if (!uris.empty())
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, Poco::URI(uris[0]).getPath(), context);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_ast)
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(Poco::URI(uri).getPath());
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, context);
}
}
@ -220,8 +222,8 @@ private:
std::atomic_size_t index = 0;
};
StorageURLSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, size_t max_addresses, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<StorageURLSource::DisclosedGlobIterator::Impl>(uri, max_addresses, query, virtual_columns, context)) {}
StorageURLSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, size_t max_addresses, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<StorageURLSource::DisclosedGlobIterator::Impl>(uri, max_addresses, predicate, virtual_columns, context)) {}
String StorageURLSource::DisclosedGlobIterator::next()
{
@ -260,7 +262,6 @@ StorageURLSource::StorageURLSource(
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t max_parsing_threads,
const SelectQueryInfo &,
const HTTPHeaderEntries & headers_,
const URIParams & params,
bool glob_url,
@ -874,7 +875,70 @@ bool IStorageURLBase::parallelizeOutputAfterReading(ContextPtr context) const
return FormatFactory::instance().checkParallelizeOutputAfterReading(format_name, context);
}
Pipe IStorageURLBase::read(
class ReadFromURL : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromURL"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromURL(
Block sample_block,
std::shared_ptr<IStorageURLBase> storage_,
std::vector<String> * uri_options_,
ReadFromFormatInfo info_,
const bool need_only_count_,
std::vector<std::pair<std::string, std::string>> read_uri_params_,
std::function<void(std::ostream &)> read_post_data_callback_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, storage(std::move(storage_))
, uri_options(uri_options_)
, info(std::move(info_))
, need_only_count(need_only_count_)
, read_uri_params(std::move(read_uri_params_))
, read_post_data_callback(std::move(read_post_data_callback_))
, context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
private:
std::shared_ptr<IStorageURLBase> storage;
std::vector<String> * uri_options;
ReadFromFormatInfo info;
const bool need_only_count;
std::vector<std::pair<std::string, std::string>> read_uri_params;
std::function<void(std::ostream &)> read_post_data_callback;
ContextPtr context;
size_t max_block_size;
size_t num_streams;
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper;
bool is_url_with_globs = false;
bool is_empty_glob = false;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromURL::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void IStorageURLBase::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -884,16 +948,61 @@ Pipe IStorageURLBase::read(
size_t num_streams)
{
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
bool is_url_with_globs = urlWithGlobs(uri);
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
if (distributed_processing)
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
auto read_post_data_callback = getReadPOSTDataCallback(
read_from_format_info.columns_description.getNamesOfPhysical(),
read_from_format_info.columns_description,
query_info,
local_context,
processed_stage,
max_block_size);
auto this_ptr = std::static_pointer_cast<IStorageURLBase>(shared_from_this());
auto reading = std::make_unique<ReadFromURL>(
read_from_format_info.source_header,
std::move(this_ptr),
nullptr,
std::move(read_from_format_info),
need_only_count,
std::move(params),
std::move(read_post_data_callback),
local_context,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
void ReadFromURL::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator_wrapper || is_empty_glob)
return;
if (uri_options)
{
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, done = false]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
done = true;
return *uri_options;
});
return;
}
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
is_url_with_globs = urlWithGlobs(storage->uri);
if (storage->distributed_processing)
{
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>(
[callback = local_context->getReadTaskCallback(), max_addresses]()
[callback = context->getReadTaskCallback(), max_addresses]()
{
String next_uri = callback();
if (next_uri.empty())
@ -904,11 +1013,14 @@ Pipe IStorageURLBase::read(
else if (is_url_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, max_addresses, query_info.query, virtual_columns, local_context);
auto glob_iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(storage->uri, max_addresses, predicate, storage->virtual_columns, context);
/// check if we filtered out all the paths
if (glob_iterator->size() == 0)
return Pipe(std::make_shared<NullSource>(read_from_format_info.source_header));
{
is_empty_glob = true;
return;
}
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([glob_iterator, max_addresses]()
{
@ -923,7 +1035,7 @@ Pipe IStorageURLBase::read(
}
else
{
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, max_addresses, done = false]() mutable
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([max_addresses, done = false, &uri = storage->uri]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
@ -932,49 +1044,69 @@ Pipe IStorageURLBase::read(
});
num_streams = 1;
}
}
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
if (is_empty_glob)
{
pipeline.init(Pipe(std::make_shared<NullSource>(info.source_header)));
return;
}
Pipes pipes;
pipes.reserve(num_streams);
const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_threads = context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
read_from_format_info,
auto source = std::make_shared<StorageURLSource>(
info,
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(
read_from_format_info.columns_description.getNamesOfPhysical(),
read_from_format_info.columns_description,
query_info,
local_context,
processed_stage,
max_block_size),
format_name,
format_settings,
getName(),
local_context,
storage->getReadMethod(),
read_post_data_callback,
storage->format_name,
storage->format_settings,
storage->getName(),
context,
max_block_size,
getHTTPTimeouts(local_context),
compression_method,
getHTTPTimeouts(context),
storage->compression_method,
max_parsing_threads,
query_info,
headers,
params,
storage->headers,
read_uri_params,
is_url_with_globs,
need_only_count));
need_only_count);
source->setKeyCondition(filter_nodes.nodes, context);
pipes.emplace_back(std::move(source));
}
return Pipe::unitePipes(std::move(pipes));
if (uri_options)
std::shuffle(uri_options->begin(), uri_options->end(), thread_local_rng);
auto pipe = Pipe::unitePipes(std::move(pipes));
size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = context->getSettingsRef().parallelize_output_from_storages;
if (parallelize_output && storage->parallelizeOutputAfterReading(context) && output_ports > 0 && output_ports < num_streams)
pipe.resize(num_streams);
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
Pipe StorageURLWithFailover::read(
void StorageURLWithFailover::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -984,38 +1116,34 @@ Pipe StorageURLWithFailover::read(
size_t num_streams)
{
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
auto iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([&, done = false]() mutable
{
if (done)
return StorageURLSource::FailoverOptions{};
done = true;
return uri_options;
});
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
const size_t max_threads = local_context->getSettingsRef().max_threads;
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
auto pipe = Pipe(std::make_shared<StorageURLSource>(
read_from_format_info,
iterator_wrapper,
getReadMethod(),
getReadPOSTDataCallback(read_from_format_info.columns_description.getNamesOfPhysical(), read_from_format_info.columns_description, query_info, local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
auto read_post_data_callback = getReadPOSTDataCallback(
read_from_format_info.columns_description.getNamesOfPhysical(),
read_from_format_info.columns_description,
query_info,
local_context,
processed_stage,
max_block_size);
auto this_ptr = std::static_pointer_cast<StorageURL>(shared_from_this());
auto reading = std::make_unique<ReadFromURL>(
read_from_format_info.source_header,
std::move(this_ptr),
&uri_options,
std::move(read_from_format_info),
need_only_count,
std::move(params),
std::move(read_post_data_callback),
local_context,
max_block_size,
getHTTPTimeouts(local_context),
compression_method,
max_parsing_threads,
query_info,
headers,
params));
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe;
num_streams);
query_plan.addStep(std::move(reading));
}

View File

@ -34,7 +34,8 @@ class PullingPipelineExecutor;
class IStorageURLBase : public IStorage
{
public:
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -67,6 +68,8 @@ public:
const ContextPtr & context);
protected:
friend class ReadFromURL;
IStorageURLBase(
const String & uri_,
ContextPtr context_,
@ -136,7 +139,7 @@ public:
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(const String & uri_, size_t max_addresses, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
DisclosedGlobIterator(const String & uri_, size_t max_addresses, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
String next();
size_t size();
@ -162,7 +165,6 @@ public:
const ConnectionTimeouts & timeouts,
CompressionMethod compression_method,
size_t max_parsing_threads,
const SelectQueryInfo & query_info,
const HTTPHeaderEntries & headers_ = {},
const URIParams & params = {},
bool glob_url = false,
@ -317,7 +319,8 @@ public:
ContextPtr context_,
const String & compression_method_);
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,

View File

@ -81,9 +81,9 @@ void StorageURLCluster::addColumnsStructureToQuery(ASTPtr & query, const String
TableFunctionURLCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, context->getSettingsRef().glob_expansion_max_elements, query, virtual_columns, context);
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, context->getSettingsRef().glob_expansion_max_elements, predicate, virtual_columns, context);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}

View File

@ -34,7 +34,7 @@ public:
NamesAndTypesList getVirtuals() const override { return virtual_columns; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }

View File

@ -102,7 +102,8 @@ std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
return write_body_callback;
}
Pipe StorageXDBC::read(
void StorageXDBC::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -114,7 +115,7 @@ Pipe StorageXDBC::read(
storage_snapshot->check(column_names);
bridge_helper->startBridgeSync();
return IStorageURLBase::read(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
IStorageURLBase::read(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)

View File

@ -19,7 +19,8 @@ namespace DB
class StorageXDBC : public IStorageURLBase
{
public:
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,

View File

@ -36,7 +36,10 @@
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
#include "Functions/FunctionsLogical.h"
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
#include <Functions/FunctionHelpers.h>
@ -390,9 +393,9 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const String & path_example, const ContextPtr & context)
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
{
if (!query || virtual_columns.empty())
if (!predicate || virtual_columns.empty())
return {};
Block block;
@ -401,16 +404,12 @@ ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList
if (column.name == "_file" || column.name == "_path")
block.insert({column.type->createColumn(), column.type, column.name});
}
/// Create a block with one row to construct filter
/// Append "idx" column as the filter result
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
addPathAndFileToVirtualColumns(block, path_example, 0);
ASTPtr filter_ast;
prepareFilterBlockWithQuery(query, context, block, filter_ast);
return filter_ast;
return splitFilterDagForAllowedInputs(predicate, block);
}
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context, ASTPtr filter_ast)
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
Block block;
for (const auto & column : virtual_columns)
@ -423,7 +422,7 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i);
filterBlockWithQuery(query, block, context, filter_ast);
filterBlockWithDAG(dag, block, context);
return block.getByName("_idx").column;
}
@ -523,6 +522,37 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
return &node_copy;
}
else if (node->function_base->getName() == "indexHint")
{
if (const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(node->function_base.get()))
{
if (const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get()))
{
auto index_hint_dag = index_hint->getActions()->clone();
ActionsDAG::NodeRawConstPtrs atoms;
for (const auto & output : index_hint_dag->getOutputs())
if (const auto * child_copy = splitFilterNodeForAllowedInputs(output, allowed_inputs, additional_nodes))
atoms.push_back(child_copy);
if (!atoms.empty())
{
const auto * res = atoms.at(0);
if (atoms.size() > 1)
{
FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
res = &index_hint_dag->addFunction(func_builder_and, atoms, {});
}
if (!res->result_type->equals(*node->result_type))
res = &index_hint_dag->addCast(*res, node->result_type, {});
additional_nodes.splice(additional_nodes.end(), ActionsDAG::detachNodes(std::move(*index_hint_dag)));
return res;
}
}
}
}
}
if (!canEvaluateSubtree(node, allowed_inputs))

View File

@ -58,14 +58,14 @@ auto extractSingleValueFromBlock(const Block & block, const String & name)
NamesAndTypesList getPathFileAndSizeVirtualsForStorage(NamesAndTypesList storage_columns);
ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const String & path_example, const ContextPtr & context);
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context, ASTPtr filter_ast);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context, ASTPtr filter_ast)
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
auto indexes_column = getFilterByPathAndFileIndexes(paths, query, virtual_columns, context, filter_ast);
auto indexes_column = getFilterByPathAndFileIndexes(paths, dag, virtual_columns, context);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size())
return;

View File

@ -3,10 +3,10 @@
2
(Expression)
ExpressionTransform
(ReadFromStorage)
(ReadFromFile)
File 0 → 1
(Expression)
ExpressionTransform × 2
(ReadFromStorage)
(ReadFromFile)
Resize 1 → 2
File 0 → 1