mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Refactor StorageHDFS and StorageFile virtual columns filtering
This commit is contained in:
parent
d58b76ce06
commit
5521e5d9b1
@ -15,6 +15,8 @@
|
||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
#include <Processors/Sources/ConstChunkGenerator.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/CompressionMethod.h>
|
||||
@ -408,6 +410,35 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
class HDFSSource::DisclosedGlobIterator::Impl
|
||||
{
|
||||
public:
|
||||
|
||||
Impl(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
|
||||
uris = getPathsList(path_from_uri, uri_without_path, context);
|
||||
ActionsDAGPtr filter_dag;
|
||||
if (!uris.empty())
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, uris[0].path);
|
||||
|
||||
if (filter_dag)
|
||||
{
|
||||
std::vector<String> paths;
|
||||
paths.reserve(uris.size());
|
||||
for (const auto & path_with_info : uris)
|
||||
paths.push_back(path_with_info.path);
|
||||
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, context);
|
||||
}
|
||||
auto file_progress_callback = context->getFileProgressCallback();
|
||||
|
||||
for (auto & elem : uris)
|
||||
{
|
||||
elem.path = uri_without_path + elem.path;
|
||||
if (file_progress_callback && elem.info)
|
||||
file_progress_callback(FileProgress(0, elem.info->size));
|
||||
}
|
||||
uris_iter = uris.begin();
|
||||
}
|
||||
|
||||
Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
|
||||
@ -456,21 +487,21 @@ private:
|
||||
class HDFSSource::URISIterator::Impl : WithContext
|
||||
{
|
||||
public:
|
||||
explicit Impl(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
|
||||
explicit Impl(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
|
||||
: WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback())
|
||||
{
|
||||
ASTPtr filter_ast;
|
||||
ActionsDAGPtr filter_dag;
|
||||
if (!uris.empty())
|
||||
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first, getContext());
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first);
|
||||
|
||||
if (filter_ast)
|
||||
if (filter_dag)
|
||||
{
|
||||
std::vector<String> paths;
|
||||
paths.reserve(uris.size());
|
||||
for (const auto & uri : uris)
|
||||
paths.push_back(getPathFromUriAndUriWithoutPath(uri).first);
|
||||
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast);
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, getContext());
|
||||
}
|
||||
|
||||
if (!uris.empty())
|
||||
@ -520,13 +551,16 @@ private:
|
||||
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, query, virtual_columns, context)) {}
|
||||
|
||||
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, predicate, virtual_columns, context)) {}
|
||||
|
||||
StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next()
|
||||
{
|
||||
return pimpl->next();
|
||||
}
|
||||
|
||||
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, query, virtual_columns, context))
|
||||
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, predicate, virtual_columns, context))
|
||||
{
|
||||
}
|
||||
|
||||
@ -541,8 +575,8 @@ HDFSSource::HDFSSource(
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
bool need_only_count_,
|
||||
const SelectQueryInfo & query_info_)
|
||||
bool need_only_count_)
|
||||
//const SelectQueryInfo & query_info_)
|
||||
: ISource(info.source_header, false)
|
||||
, WithContext(context_)
|
||||
, storage(std::move(storage_))
|
||||
@ -553,7 +587,7 @@ HDFSSource::HDFSSource(
|
||||
, file_iterator(file_iterator_)
|
||||
, columns_description(info.columns_description)
|
||||
, need_only_count(need_only_count_)
|
||||
, query_info(query_info_)
|
||||
//, query_info(query_info_)
|
||||
{
|
||||
initialize();
|
||||
}
|
||||
@ -843,7 +877,82 @@ bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const
|
||||
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_);
|
||||
}
|
||||
|
||||
Pipe StorageHDFS::read(
|
||||
class ReadFromHDFS : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromHDFS"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
void applyFilters() override;
|
||||
|
||||
ReadFromHDFS(
|
||||
Block sample_block,
|
||||
std::vector<String> uris_,
|
||||
bool distributed_processing_,
|
||||
NamesAndTypesList virtual_columns_,
|
||||
bool is_path_with_globs_,
|
||||
ReadFromFormatInfo info_,
|
||||
bool need_only_count_,
|
||||
std::shared_ptr<StorageHDFS> storage_,
|
||||
// StorageSnapshotPtr storage_snapshot_,
|
||||
// const StorageEmbeddedRocksDB & storage_,
|
||||
// SelectQueryInfo query_info_,
|
||||
ContextPtr context_,
|
||||
size_t max_block_size_,
|
||||
size_t num_streams_)
|
||||
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
|
||||
, uris(std::move(uris_))
|
||||
, distributed_processing(distributed_processing_)
|
||||
, virtual_columns(std::move(virtual_columns_))
|
||||
, is_path_with_globs(is_path_with_globs_)
|
||||
, info(std::move(info_))
|
||||
, need_only_count(need_only_count_)
|
||||
, storage(std::move(storage_))
|
||||
// , storage_snapshot(std::move(storage_snapshot_))
|
||||
// , storage(storage_)
|
||||
// , query_info(std::move(query_info_))
|
||||
, context(std::move(context_))
|
||||
, max_block_size(max_block_size_)
|
||||
, num_streams(num_streams_)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<String> uris;
|
||||
const bool distributed_processing;
|
||||
NamesAndTypesList virtual_columns;
|
||||
bool is_path_with_globs;
|
||||
ReadFromFormatInfo info;
|
||||
const bool need_only_count;
|
||||
std::shared_ptr<StorageHDFS> storage;
|
||||
|
||||
// StorageSnapshotPtr storage_snapshot;
|
||||
// const StorageEmbeddedRocksDB & storage;
|
||||
// SelectQueryInfo query_info;
|
||||
ContextPtr context;
|
||||
|
||||
size_t max_block_size;
|
||||
size_t num_streams;
|
||||
|
||||
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper;
|
||||
|
||||
// FieldVectorPtr keys;
|
||||
// bool all_scan = false;
|
||||
|
||||
void createIterator(const ActionsDAG::Node * predicate);
|
||||
};
|
||||
|
||||
void ReadFromHDFS::applyFilters()
|
||||
{
|
||||
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
|
||||
createIterator(predicate);
|
||||
}
|
||||
|
||||
void StorageHDFS::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -852,18 +961,44 @@ Pipe StorageHDFS::read(
|
||||
size_t max_block_size,
|
||||
size_t num_streams)
|
||||
{
|
||||
std::shared_ptr<HDFSSource::IteratorWrapper> iterator_wrapper{nullptr};
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), virtual_columns);
|
||||
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
|
||||
&& context_->getSettingsRef().optimize_count_from_files;
|
||||
|
||||
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
||||
|
||||
auto reading = std::make_unique<ReadFromHDFS>(
|
||||
read_from_format_info.source_header,
|
||||
uris,
|
||||
distributed_processing,
|
||||
virtual_columns,
|
||||
is_path_with_globs,
|
||||
std::move(read_from_format_info),
|
||||
need_only_count,
|
||||
std::move(this_ptr),
|
||||
context_,
|
||||
max_block_size,
|
||||
num_streams);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromHDFS::createIterator(const ActionsDAG::Node * predicate)
|
||||
{
|
||||
if (iterator_wrapper)
|
||||
return;
|
||||
|
||||
if (distributed_processing)
|
||||
{
|
||||
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>(
|
||||
[callback = context_->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo {
|
||||
[callback = context->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo {
|
||||
return StorageHDFS::PathWithInfo{callback(), std::nullopt};
|
||||
});
|
||||
}
|
||||
else if (is_path_with_globs)
|
||||
{
|
||||
/// Iterate through disclosed globs and make a source for each file
|
||||
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], query_info.query, virtual_columns, context_);
|
||||
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], predicate, virtual_columns, context);
|
||||
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
|
||||
{
|
||||
return glob_iterator->next();
|
||||
@ -871,31 +1006,31 @@ Pipe StorageHDFS::read(
|
||||
}
|
||||
else
|
||||
{
|
||||
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, query_info.query, virtual_columns, context_);
|
||||
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, predicate, virtual_columns, context);
|
||||
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
|
||||
{
|
||||
return uris_iterator->next();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals());
|
||||
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
|
||||
&& context_->getSettingsRef().optimize_count_from_files;
|
||||
void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
createIterator(nullptr);
|
||||
|
||||
Pipes pipes;
|
||||
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<HDFSSource>(
|
||||
read_from_format_info,
|
||||
this_ptr,
|
||||
context_,
|
||||
info,
|
||||
storage,
|
||||
context,
|
||||
max_block_size,
|
||||
iterator_wrapper,
|
||||
need_only_count,
|
||||
query_info));
|
||||
need_only_count)); //,
|
||||
//query_info));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/)
|
||||
|
@ -51,7 +51,8 @@ public:
|
||||
|
||||
String getName() const override { return "HDFS"; }
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -115,6 +116,7 @@ public:
|
||||
{
|
||||
public:
|
||||
DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
DisclosedGlobIterator(const String & uri_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
StorageHDFS::PathWithInfo next();
|
||||
private:
|
||||
class Impl;
|
||||
@ -125,7 +127,7 @@ public:
|
||||
class URISIterator
|
||||
{
|
||||
public:
|
||||
URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
URISIterator(const std::vector<String> & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
StorageHDFS::PathWithInfo next();
|
||||
private:
|
||||
class Impl;
|
||||
@ -142,8 +144,8 @@ public:
|
||||
ContextPtr context_,
|
||||
UInt64 max_block_size_,
|
||||
std::shared_ptr<IteratorWrapper> file_iterator_,
|
||||
bool need_only_count_,
|
||||
const SelectQueryInfo & query_info_);
|
||||
bool need_only_count_);
|
||||
//const SelectQueryInfo & query_info_);
|
||||
|
||||
String getName() const override;
|
||||
|
||||
@ -162,7 +164,7 @@ private:
|
||||
ColumnsDescription columns_description;
|
||||
bool need_only_count;
|
||||
size_t total_rows_in_file = 0;
|
||||
SelectQueryInfo query_info;
|
||||
//SelectQueryInfo query_info;
|
||||
|
||||
std::unique_ptr<ReadBuffer> read_buf;
|
||||
std::shared_ptr<IInputFormat> input_format;
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTIdentifier_fwd.h>
|
||||
@ -44,6 +45,8 @@
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
@ -947,6 +950,23 @@ StorageFileSource::FilesIterator::FilesIterator(
|
||||
VirtualColumnUtils::filterByPathOrFile(files, files, query, virtual_columns, context_, filter_ast);
|
||||
}
|
||||
|
||||
StorageFileSource::FilesIterator::FilesIterator(
|
||||
const Strings & files_,
|
||||
std::optional<StorageFile::ArchiveInfo> archive_info_,
|
||||
const ActionsDAG::Node * predicate,
|
||||
const NamesAndTypesList & virtual_columns,
|
||||
ContextPtr context_,
|
||||
bool distributed_processing_)
|
||||
: files(files_), archive_info(std::move(archive_info_)), distributed_processing(distributed_processing_), context(context_)
|
||||
{
|
||||
ActionsDAGPtr filter_dag;
|
||||
if (!distributed_processing && !archive_info && !files.empty() && !files[0].empty())
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, files[0]);
|
||||
|
||||
if (filter_dag)
|
||||
VirtualColumnUtils::filterByPathOrFile(files, files, filter_dag, virtual_columns, context_);
|
||||
}
|
||||
|
||||
String StorageFileSource::FilesIterator::next()
|
||||
{
|
||||
if (distributed_processing)
|
||||
@ -974,16 +994,13 @@ const String & StorageFileSource::FilesIterator::getFileNameInArchive()
|
||||
StorageFileSource::StorageFileSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
std::shared_ptr<StorageFile> storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
ContextPtr context_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
UInt64 max_block_size_,
|
||||
FilesIteratorPtr files_iterator_,
|
||||
std::unique_ptr<ReadBuffer> read_buf_,
|
||||
bool need_only_count_)
|
||||
: SourceWithKeyCondition(info.source_header, false)
|
||||
, storage(std::move(storage_))
|
||||
, storage_snapshot(storage_snapshot_)
|
||||
, files_iterator(std::move(files_iterator_))
|
||||
, read_buf(std::move(read_buf_))
|
||||
, columns_description(info.columns_description)
|
||||
@ -991,7 +1008,6 @@ StorageFileSource::StorageFileSource(
|
||||
, requested_virtual_columns(info.requested_virtual_columns)
|
||||
, block_for_format(info.format_header)
|
||||
, context(context_)
|
||||
, query_info(query_info_)
|
||||
, max_block_size(max_block_size_)
|
||||
, need_only_count(need_only_count_)
|
||||
{
|
||||
@ -1322,14 +1338,87 @@ std::optional<size_t> StorageFileSource::tryGetNumRowsFromCache(const String & p
|
||||
return schema_cache.tryGetNumRows(key, get_last_mod_time);
|
||||
}
|
||||
|
||||
Pipe StorageFile::read(
|
||||
class ReadFromFile : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromFile"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
void applyFilters() override;
|
||||
|
||||
ReadFromFile(
|
||||
Block sample_block,
|
||||
std::shared_ptr<StorageFile> storage_,
|
||||
std::vector<std::string> paths_,
|
||||
std::optional<StorageFile::ArchiveInfo> archive_info_,
|
||||
NamesAndTypesList virtual_columns_,
|
||||
bool distributed_processing_,
|
||||
ReadFromFormatInfo info_,
|
||||
const bool need_only_count_,
|
||||
size_t total_bytes_to_read_,
|
||||
ContextPtr context_,
|
||||
size_t max_block_size_,
|
||||
size_t num_streams_)
|
||||
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
|
||||
, storage(std::move(storage_))
|
||||
, paths(std::move(paths_))
|
||||
, archive_info(std::move(archive_info_))
|
||||
, virtual_columns(std::move(virtual_columns_))
|
||||
, distributed_processing(distributed_processing_)
|
||||
, info(std::move(info_))
|
||||
, need_only_count(need_only_count_)
|
||||
, total_bytes_to_read(total_bytes_to_read_)
|
||||
, context(std::move(context_))
|
||||
, max_block_size(max_block_size_)
|
||||
, max_num_streams(num_streams_)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<StorageFile> storage;
|
||||
|
||||
std::vector<std::string> paths;
|
||||
std::optional<StorageFile::ArchiveInfo> archive_info;
|
||||
|
||||
NamesAndTypesList virtual_columns;
|
||||
const bool distributed_processing;
|
||||
|
||||
ReadFromFormatInfo info;
|
||||
const bool need_only_count;
|
||||
|
||||
size_t total_bytes_to_read;
|
||||
|
||||
ContextPtr context;
|
||||
|
||||
size_t max_block_size;
|
||||
const size_t max_num_streams;
|
||||
|
||||
std::shared_ptr<StorageFileSource::FilesIterator> files_iterator;
|
||||
|
||||
// FieldVectorPtr keys;
|
||||
// bool all_scan = false;
|
||||
|
||||
void createIterator(const ActionsDAG::Node * predicate);
|
||||
};
|
||||
|
||||
void ReadFromFile::applyFilters()
|
||||
{
|
||||
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
|
||||
createIterator(predicate);
|
||||
}
|
||||
|
||||
void StorageFile::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
const size_t max_num_streams)
|
||||
size_t num_streams)
|
||||
{
|
||||
if (use_table_fd)
|
||||
{
|
||||
@ -1346,17 +1435,48 @@ Pipe StorageFile::read(
|
||||
|
||||
if (p->size() == 1 && !fs::exists(p->at(0)))
|
||||
{
|
||||
if (context->getSettingsRef().engine_file_empty_if_not_exists)
|
||||
return Pipe(std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
|
||||
else
|
||||
if (!context->getSettingsRef().engine_file_empty_if_not_exists)
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", p->at(0));
|
||||
|
||||
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
|
||||
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, archive_info, query_info.query, virtual_columns, context, distributed_processing);
|
||||
|
||||
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
|
||||
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
|
||||
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
|
||||
&& context->getSettingsRef().optimize_count_from_files;
|
||||
|
||||
auto reading = std::make_unique<ReadFromFile>(
|
||||
read_from_format_info.source_header,
|
||||
std::move(this_ptr),
|
||||
paths,
|
||||
archive_info,
|
||||
virtual_columns,
|
||||
distributed_processing,
|
||||
std::move(read_from_format_info),
|
||||
need_only_count,
|
||||
total_bytes_to_read,
|
||||
context,
|
||||
max_block_size,
|
||||
num_streams);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromFile::createIterator(const ActionsDAG::Node * predicate)
|
||||
{
|
||||
if (files_iterator)
|
||||
return;
|
||||
|
||||
files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, archive_info, predicate, virtual_columns, context, distributed_processing);
|
||||
}
|
||||
|
||||
void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
size_t num_streams = max_num_streams;
|
||||
|
||||
size_t files_to_read = 0;
|
||||
@ -1377,10 +1497,6 @@ Pipe StorageFile::read(
|
||||
if (progress_callback && !archive_info)
|
||||
progress_callback(FileProgress(0, total_bytes_to_read));
|
||||
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals());
|
||||
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
|
||||
&& context->getSettingsRef().optimize_count_from_files;
|
||||
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
/// In case of reading from fd we have to check whether we have already created
|
||||
@ -1388,22 +1504,20 @@ Pipe StorageFile::read(
|
||||
/// If yes, then we should use it in StorageFileSource. Atomic bool flag is needed
|
||||
/// to prevent data race in case of parallel reads.
|
||||
std::unique_ptr<ReadBuffer> read_buffer;
|
||||
if (has_peekable_read_buffer_from_fd.exchange(false))
|
||||
read_buffer = std::move(peekable_read_buffer_from_fd);
|
||||
if (storage->has_peekable_read_buffer_from_fd.exchange(false))
|
||||
read_buffer = std::move(storage->peekable_read_buffer_from_fd);
|
||||
|
||||
pipes.emplace_back(std::make_shared<StorageFileSource>(
|
||||
read_from_format_info,
|
||||
this_ptr,
|
||||
storage_snapshot,
|
||||
info,
|
||||
storage,
|
||||
context,
|
||||
query_info,
|
||||
max_block_size,
|
||||
files_iterator,
|
||||
std::move(read_buffer),
|
||||
need_only_count));
|
||||
}
|
||||
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||
}
|
||||
|
||||
|
||||
|
@ -53,7 +53,8 @@ public:
|
||||
|
||||
std::string getName() const override { return "File"; }
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -137,6 +138,7 @@ public:
|
||||
protected:
|
||||
friend class StorageFileSource;
|
||||
friend class StorageFileSink;
|
||||
friend class ReadFromFile;
|
||||
|
||||
private:
|
||||
void setStorageMetadata(CommonArguments args);
|
||||
@ -199,6 +201,14 @@ public:
|
||||
ContextPtr context_,
|
||||
bool distributed_processing_ = false);
|
||||
|
||||
explicit FilesIterator(
|
||||
const Strings & files_,
|
||||
std::optional<StorageFile::ArchiveInfo> archive_info_,
|
||||
const ActionsDAG::Node * predicate,
|
||||
const NamesAndTypesList & virtual_columns,
|
||||
ContextPtr context_,
|
||||
bool distributed_processing_ = false);
|
||||
|
||||
String next();
|
||||
|
||||
bool isReadFromArchive() const
|
||||
@ -234,9 +244,7 @@ private:
|
||||
StorageFileSource(
|
||||
const ReadFromFormatInfo & info,
|
||||
std::shared_ptr<StorageFile> storage_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
ContextPtr context_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
UInt64 max_block_size_,
|
||||
FilesIteratorPtr files_iterator_,
|
||||
std::unique_ptr<ReadBuffer> read_buf_,
|
||||
@ -269,7 +277,6 @@ private:
|
||||
std::optional<size_t> tryGetNumRowsFromCache(const String & path, time_t last_mod_time) const;
|
||||
|
||||
std::shared_ptr<StorageFile> storage;
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
FilesIteratorPtr files_iterator;
|
||||
String current_path;
|
||||
std::optional<size_t> current_file_size;
|
||||
@ -290,7 +297,6 @@ private:
|
||||
Block block_for_format;
|
||||
|
||||
ContextPtr context; /// TODO Untangle potential issues with context lifetime.
|
||||
SelectQueryInfo query_info;
|
||||
UInt64 max_block_size;
|
||||
|
||||
bool finished_generate = false;
|
||||
|
@ -390,6 +390,42 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
|
||||
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
||||
}
|
||||
|
||||
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path_example)
|
||||
{
|
||||
if (!predicate || virtual_columns.empty())
|
||||
return {};
|
||||
|
||||
Block block;
|
||||
for (const auto & column : virtual_columns)
|
||||
{
|
||||
if (column.name == "_file" || column.name == "_path")
|
||||
block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
/// Create a block with one row to construct filter
|
||||
/// Append "idx" column as the filter result
|
||||
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||
addPathAndFileToVirtualColumns(block, path_example, 0);
|
||||
return splitFilterDagForAllowedInputs(predicate, block);
|
||||
}
|
||||
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
Block block;
|
||||
for (const auto & column : virtual_columns)
|
||||
{
|
||||
if (column.name == "_file" || column.name == "_path")
|
||||
block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||
|
||||
for (size_t i = 0; i != paths.size(); ++i)
|
||||
addPathAndFileToVirtualColumns(block, paths[i], i);
|
||||
|
||||
filterBlockWithDAG(dag, block, context);
|
||||
|
||||
return block.getByName("_idx").column;
|
||||
}
|
||||
|
||||
ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const String & path_example, const ContextPtr & context)
|
||||
{
|
||||
if (!query || virtual_columns.empty())
|
||||
|
@ -77,6 +77,25 @@ void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & pa
|
||||
sources = std::move(filtered_sources);
|
||||
}
|
||||
|
||||
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path_example);
|
||||
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
|
||||
template <typename T>
|
||||
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
auto indexes_column = getFilterByPathAndFileIndexes(paths, dag, virtual_columns, context);
|
||||
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
|
||||
if (indexes.size() == sources.size())
|
||||
return;
|
||||
|
||||
std::vector<T> filtered_sources;
|
||||
filtered_sources.reserve(indexes.size());
|
||||
for (auto index : indexes)
|
||||
filtered_sources.emplace_back(std::move(sources[index]));
|
||||
sources = std::move(filtered_sources);
|
||||
}
|
||||
|
||||
void addRequestedPathFileAndSizeVirtualsToChunk(
|
||||
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, std::optional<size_t> size, const String * filename = nullptr);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user