Use filter by file/path before reading in url/file/hdfs table functions, reduce code duplication

This commit is contained in:
avogar 2023-08-17 16:54:43 +00:00
parent 1917fa9b4a
commit 4c32097df3
36 changed files with 442 additions and 441 deletions

View File

@ -366,6 +366,8 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3PutObject, "Number of DiskS3 API PutObject calls.") \
M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \
\
M(EngineFileLikeReadFiles, "Number of files read in table engines working with files (like File/S3/URL/HDFS).") \
\
M(ReadBufferFromS3Microseconds, "Time spent on reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \

View File

@ -29,7 +29,7 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/PartitionedSink.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Formats/ReadSchemaUtils.h>
@ -50,6 +50,11 @@
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event EngineFileLikeReadFiles;
}
namespace DB
{
namespace ErrorCodes
@ -291,12 +296,7 @@ StorageHDFS::StorageHDFS(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
ColumnsDescription StorageHDFS::getTableStructureFromData(
@ -363,11 +363,22 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
class HDFSSource::DisclosedGlobIterator::Impl
{
public:
Impl(ContextPtr context_, const String & uri)
Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
uris = getPathsList(path_from_uri, uri_without_path, context_);
auto file_progress_callback = context_->getFileProgressCallback();
uris = getPathsList(path_from_uri, uri_without_path, context);
auto filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, context);
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & path_with_info : uris)
paths.push_back(path_with_info.path);
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
}
auto file_progress_callback = context->getFileProgressCallback();
for (auto & elem : uris)
{
elem.path = uri_without_path + elem.path;
@ -397,9 +408,20 @@ private:
class HDFSSource::URISIterator::Impl : WithContext
{
public:
explicit Impl(const std::vector<String> & uris_, ContextPtr context_)
explicit Impl(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_)
: WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback())
{
auto filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getContext());
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(getPathFromUriAndUriWithoutPath(uri).first);
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast);
}
if (!uris.empty())
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris[0]);
@ -444,16 +466,16 @@ private:
std::function<void(FileProgress)> file_progress_callback;
};
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(ContextPtr context_, const String & uri)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(context_, uri)) {}
HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::DisclosedGlobIterator::Impl>(uri, query, virtual_columns, context)) {}
StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next()
{
return pimpl->next();
}
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, ContextPtr context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
HDFSSource::URISIterator::URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, query, virtual_columns, context))
{
}
@ -535,6 +557,8 @@ bool HDFSSource::initialize()
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return true;
}
@ -561,24 +585,7 @@ Chunk HDFSSource::generate()
UInt64 num_rows = chunk.getNumRows();
size_t chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, current_path);
columns.push_back(column->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
auto column = DataTypeLowCardinality{std::make_shared<DataTypeString>()}.createColumnConst(num_rows, std::move(file_name));
columns.push_back(column->convertToFullColumnIfConst());
}
}
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, current_path);
return Chunk(std::move(columns), num_rows);
}
@ -727,7 +734,7 @@ bool StorageHDFS::supportsSubsetOfColumns() const
Pipe StorageHDFS::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
@ -744,7 +751,7 @@ Pipe StorageHDFS::read(
else if (is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uris[0], query_info.query, virtual_columns, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
@ -752,7 +759,7 @@ Pipe StorageHDFS::read(
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, query_info.query, virtual_columns, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();

View File

@ -123,7 +123,7 @@ public:
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(ContextPtr context_, const String & uri_);
DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;
@ -134,7 +134,7 @@ public:
class URISIterator
{
public:
URISIterator(const std::vector<String> & uris_, ContextPtr context);
URISIterator(const std::vector<String> & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
StorageHDFS::PathWithInfo next();
private:
class Impl;

View File

@ -21,6 +21,7 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/TableFunctionHDFSCluster.h>
#include <memory>
@ -64,6 +65,8 @@ StorageHDFSCluster::StorageHDFSCluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
@ -76,9 +79,9 @@ void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(uri, query, virtual_columns, context);
auto callback = std::make_shared<std::function<String()>>([iter = std::move(iterator)]() mutable -> String { return iter->next().path; });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}

View File

@ -45,6 +45,7 @@ private:
String uri;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
};

View File

@ -25,7 +25,6 @@
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Formats/FormatFactory.h>
@ -70,13 +69,13 @@ StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_header, context, nullptr, request_settings_))
client_, globbed_uri_, query, virtual_columns, context, nullptr, request_settings_))
{
/// todo(kssenii): remove this loop, it should not be here
while (true)

View File

@ -44,7 +44,7 @@ public:
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_header,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_ = {});

View File

@ -32,7 +32,6 @@
# include <Storages/StorageS3.h>
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Common/NamedCollections/NamedCollections.h>
@ -171,15 +170,7 @@ StorageS3Queue::StorageS3Queue(
}
files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
@ -527,7 +518,7 @@ StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
*configuration.client,
configuration.url,
query,
virtual_block,
virtual_columns,
local_context,
s3queue_settings->s3queue_polling_size.value,
configuration.request_settings);

View File

@ -93,7 +93,6 @@ private:
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
UInt64 reschedule_processing_interval_ms;
std::optional<FormatSettings> format_settings;

View File

@ -8,7 +8,6 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/ASTCreateQuery.h>
@ -18,7 +17,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <re2/re2.h>
#include <azure/identity/managed_identity_credential.hpp>
#include <azure/storage/common/storage_credential.hpp>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
@ -29,7 +27,6 @@
#include <Storages/StorageSnapshot.h>
#include <Storages/PartitionedSink.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/parseGlobs.h>
@ -51,6 +48,11 @@ namespace CurrentMetrics
extern const Metric ObjectStorageAzureThreadsActive;
}
namespace ProfileEvents
{
extern const Event EngineFileLikeReadFiles;
}
namespace DB
{
@ -479,15 +481,7 @@ StorageAzureBlob::StorageAzureBlob(
for (const auto & key : configuration.blobs_paths)
objects.emplace_back(key);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
@ -684,13 +678,13 @@ Pipe StorageAzureBlob::read(
/// Iterate through disclosed globs and make a source for each file
iterator_wrapper = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage.get(), configuration.container, configuration.blob_path,
query_info.query, virtual_block, local_context, nullptr, local_context->getFileProgressCallback());
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}
else
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::KeysIterator>(
object_storage.get(), configuration.container, configuration.blobs_paths,
query_info.query, virtual_block, local_context, nullptr, local_context->getFileProgressCallback());
query_info.query, virtual_columns, local_context, nullptr, local_context->getFileProgressCallback());
}
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
@ -807,29 +801,12 @@ bool StorageAzureBlob::parallelizeOutputAfterReading(ContextPtr context) const
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context);
}
static void addPathToVirtualColumns(Block & block, const String & path, size_t idx)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
if (block.has("_file"))
{
auto pos = path.find_last_of('/');
assert(pos != std::string::npos);
auto file = path.substr(pos + 1);
block.getByName("_file").column->assumeMutableRef().insert(file);
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
StorageAzureBlobSource::GlobIterator::GlobIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
String blob_path_with_globs_,
ASTPtr query_,
const Block & virtual_header_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_,
std::function<void(FileProgress)> file_progress_callback_)
@ -838,7 +815,7 @@ StorageAzureBlobSource::GlobIterator::GlobIterator(
, container(container_)
, blob_path_with_globs(blob_path_with_globs_)
, query(query_)
, virtual_header(virtual_header_)
, virtual_columns(virtual_columns_)
, outer_blobs(outer_blobs_)
, file_progress_callback(file_progress_callback_)
{
@ -906,40 +883,28 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
index = 0;
if (!is_initialized)
{
createFilterAST(new_batch.front().relative_path);
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getContext());
is_initialized = true;
}
if (filter_ast)
{
auto block = virtual_header.cloneEmpty();
for (size_t i = 0; i < new_batch.size(); ++i)
addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i);
std::vector<String> paths;
paths.reserve(new_batch.size());
for (auto & path_with_metadata : new_batch)
paths.push_back(fs::path(container) / path_with_metadata.relative_path);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
blobs_with_metadata.clear();
for (UInt64 idx : idxs.getData())
{
if (file_progress_callback)
file_progress_callback(FileProgress(0, new_batch[idx].metadata.size_bytes));
blobs_with_metadata.emplace_back(std::move(new_batch[idx]));
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata.back());
}
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, query, virtual_columns, getContext(), filter_ast);
}
else
{
if (outer_blobs)
outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end());
blobs_with_metadata = std::move(new_batch);
if (file_progress_callback)
{
for (const auto & [_, info] : blobs_with_metadata)
file_progress_callback(FileProgress(0, info.size_bytes));
}
if (outer_blobs)
outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end());
blobs_with_metadata = std::move(new_batch);
if (file_progress_callback)
{
for (const auto & [_, info] : blobs_with_metadata)
file_progress_callback(FileProgress(0, info.size_bytes));
}
}
@ -949,28 +914,12 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
return blobs_with_metadata[current_index];
}
void StorageAzureBlobSource::GlobIterator::createFilterAST(const String & any_key)
{
if (!query || !virtual_header)
return;
/// Create a virtual block with one row to construct filter
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
auto block = virtual_header.cloneEmpty();
addPathToVirtualColumns(block, fs::path(container) / any_key, 0);
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
}
StorageAzureBlobSource::KeysIterator::KeysIterator(
AzureObjectStorage * object_storage_,
const std::string & container_,
const Strings & keys_,
ASTPtr query_,
const Block & virtual_header_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs,
std::function<void(FileProgress)> file_progress_callback)
@ -978,37 +927,20 @@ StorageAzureBlobSource::KeysIterator::KeysIterator(
, object_storage(object_storage_)
, container(container_)
, query(query_)
, virtual_header(virtual_header_)
, virtual_columns(virtual_columns_)
{
Strings all_keys = keys_;
/// Create a virtual block with one row to construct filter
if (query && virtual_header && !all_keys.empty())
auto filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getContext());
if (filter_ast)
{
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
Strings paths;
paths.reserve(all_keys.size());
for (const auto & key : all_keys)
paths.push_back(fs::path(container) / key);
auto block = virtual_header.cloneEmpty();
addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0);
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
if (filter_ast)
{
block = virtual_header.cloneEmpty();
for (size_t i = 0; i < all_keys.size(); ++i)
addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
Strings filtered_keys;
filtered_keys.reserve(block.rows());
for (UInt64 idx : idxs.getData())
filtered_keys.emplace_back(std::move(all_keys[idx]));
all_keys = std::move(filtered_keys);
}
VirtualColumnUtils::filterByPathOrFile(all_keys, paths, query, virtual_columns, getContext(), filter_ast);
}
for (auto && key : all_keys)
@ -1049,22 +981,7 @@ Chunk StorageAzureBlobSource::generate()
UInt64 num_rows = chunk.getNumRows();
size_t chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
const auto & file_path = reader.getPath();
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath());
return chunk;
}
@ -1163,6 +1080,8 @@ StorageAzureBlobSource::ReaderHolder StorageAzureBlobSource::createReader()
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return ReaderHolder{fs::path(container) / current_key, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
}
@ -1207,12 +1126,12 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
else if (configuration.withGlobs())
{
file_iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage, configuration.container, configuration.blob_path, nullptr, Block{}, ctx, &read_keys);
object_storage, configuration.container, configuration.blob_path, nullptr, NamesAndTypesList{}, ctx, &read_keys);
}
else
{
file_iterator = std::make_shared<StorageAzureBlobSource::KeysIterator>(
object_storage, configuration.container, configuration.blobs_paths, nullptr, Block{}, ctx, &read_keys);
object_storage, configuration.container, configuration.blobs_paths, nullptr, NamesAndTypesList{}, ctx, &read_keys);
}
std::optional<ColumnsDescription> columns_from_cache;

View File

@ -117,7 +117,6 @@ private:
Configuration configuration;
std::unique_ptr<AzureObjectStorage> object_storage;
NamesAndTypesList virtual_columns;
Block virtual_block;
const bool distributed_processing;
std::optional<FormatSettings> format_settings;
@ -162,7 +161,7 @@ public:
const std::string & container_,
String blob_path_with_globs_,
ASTPtr query_,
const Block & virtual_header_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs_,
std::function<void(FileProgress)> file_progress_callback_ = {});
@ -176,7 +175,7 @@ public:
String blob_path_with_globs;
ASTPtr query;
ASTPtr filter_ast;
Block virtual_header;
NamesAndTypesList virtual_columns;
size_t index = 0;
@ -218,7 +217,7 @@ public:
const std::string & container_,
const Strings & keys_,
ASTPtr query_,
const Block & virtual_header_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
RelativePathsWithMetadata * outer_blobs,
std::function<void(FileProgress)> file_progress_callback = {});
@ -232,8 +231,7 @@ public:
RelativePathsWithMetadata keys;
ASTPtr query;
ASTPtr filter_ast;
Block virtual_header;
NamesAndTypesList virtual_columns;
std::atomic<size_t> index = 0;
};

View File

@ -4,8 +4,6 @@
#if USE_AZURE_BLOB_STORAGE
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Processors/Sources/RemoteSource.h>
@ -13,10 +11,9 @@
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Storages/IStorage.h>
#include <Storages/StorageURL.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <TableFunctions/TableFunctionAzureBlobStorageCluster.h>
@ -60,14 +57,7 @@ StorageAzureBlobCluster::StorageAzureBlobCluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
void StorageAzureBlobCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
@ -83,7 +73,7 @@ RemoteQueryExecutor::Extension StorageAzureBlobCluster::getTaskIteratorExtension
{
auto iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage.get(), configuration.container, configuration.blob_path,
query, virtual_block, context, nullptr);
query, virtual_columns, context, nullptr);
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String{ return iterator->next().relative_path; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}

View File

@ -36,6 +36,8 @@ public:
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
bool supportsSubcolumns() const override { return true; }
private:
void updateBeforeRead(const ContextPtr & /*context*/) override {}
@ -43,7 +45,6 @@ private:
StorageAzureBlob::Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
std::unique_ptr<AzureObjectStorage> object_storage;
};

View File

@ -6,11 +6,13 @@
#include <Storages/Distributed/DistributedAsyncInsertSource.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/VirtualColumnUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier_fwd.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
@ -64,6 +66,7 @@ namespace ProfileEvents
extern const Event CreatedReadBufferOrdinary;
extern const Event CreatedReadBufferMMap;
extern const Event CreatedReadBufferMMapFailed;
extern const Event EngineFileLikeReadFiles;
}
namespace fs = std::filesystem;
@ -699,6 +702,8 @@ void StorageFile::setStorageMetadata(CommonArguments args)
storage_metadata.setConstraints(args.constraints);
storage_metadata.setComment(args.comment);
setInMemoryMetadata(storage_metadata);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
@ -721,9 +726,30 @@ public:
{
public:
explicit FilesIterator(
const Strings & files_, std::vector<std::string> archives_, std::vector<std::pair<uint64_t, std::string>> files_in_archive_)
const Strings & files_,
std::vector<std::string> archives_,
std::vector<std::pair<uint64_t, std::string>> files_in_archive_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context_)
: files(files_), archives(std::move(archives_)), files_in_archive(std::move(files_in_archive_))
{
auto filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, context_);
if (filter_ast)
{
if (files_in_archive.empty())
{
VirtualColumnUtils::filterByPathOrFile(files, files, query, virtual_columns, context_, filter_ast);
}
else
{
Strings paths;
paths.reserve(files_in_archive.size());
for (const auto & [_, file] : files_in_archive)
paths.push_back(file);
VirtualColumnUtils::filterByPathOrFile(files_in_archive, paths, query, virtual_columns, context_, filter_ast);
}
}
}
String next()
@ -946,8 +972,9 @@ public:
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
}
Chunk chunk;
@ -960,21 +987,7 @@ public:
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
/// Enrich with virtual columns.
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, current_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = current_path.find_last_of('/');
auto file_name = current_path.substr(last_slash_pos + 1);
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_name)->convertToFullColumnIfConst());
}
}
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, current_path);
return chunk;
}
@ -1028,7 +1041,7 @@ private:
Pipe StorageFile::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
@ -1093,7 +1106,7 @@ Pipe StorageFile::read(
files_in_archive_num = files_in_archive.size();
}
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, paths_to_archive, std::move(files_in_archive));
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, paths_to_archive, std::move(files_in_archive), query_info.query, virtual_columns, context);
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
size_t num_streams = max_num_streams;
@ -1648,14 +1661,6 @@ void registerStorageFile(StorageFactory & factory)
storage_features);
}
NamesAndTypesList StorageFile::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS));

View File

@ -65,7 +65,7 @@ public:
bool storesDataOnDisk() const override;
Strings getDataPaths() const override;
NamesAndTypesList getVirtuals() const override;
NamesAndTypesList getVirtuals() const override { return virtual_columns; }
static Strings getPathsList(const String & table_path, const String & user_files_path, ContextPtr context, size_t & total_bytes_to_read);
@ -152,6 +152,8 @@ private:
std::atomic<int32_t> readers_counter = 0;
FileRenamer file_renamer;
bool was_renamed = false;
NamesAndTypesList virtual_columns;
};
}

View File

@ -25,7 +25,6 @@
#include <Storages/StorageSnapshot.h>
#include <Storages/PartitionedSink.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollectionsHelpers.h>
@ -77,6 +76,7 @@ namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
extern const Event EngineFileLikeReadFiles;
}
namespace DB
@ -121,23 +121,6 @@ namespace ErrorCodes
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
static void addPathToVirtualColumns(Block & block, const String & path, size_t idx)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
if (block.has("_file"))
{
auto pos = path.find_last_of('/');
assert(pos != std::string::npos);
auto file = path.substr(pos + 1);
block.getByName("_file").column->assumeMutableRef().insert(file);
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
{
public:
@ -145,7 +128,7 @@ public:
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr & query_,
const Block & virtual_header_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
const S3Settings::RequestSettings & request_settings_,
@ -154,7 +137,7 @@ public:
, client(client_.clone())
, globbed_uri(globbed_uri_)
, query(query_)
, virtual_header(virtual_header_)
, virtual_columns(virtual_columns_)
, read_keys(read_keys_)
, request_settings(request_settings_)
, list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
@ -293,35 +276,26 @@ private:
if (!is_initialized)
{
createFilterAST(temp_buffer.front().key);
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getContext());
is_initialized = true;
}
if (filter_ast)
{
auto block = virtual_header.cloneEmpty();
for (size_t i = 0; i < temp_buffer.size(); ++i)
addPathToVirtualColumns(block, fs::path(globbed_uri.bucket) / temp_buffer[i].key, i);
std::vector<String> paths;
paths.reserve(temp_buffer.size());
for (const auto & key_with_info : temp_buffer)
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info.key);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
buffer.reserve(block.rows());
for (UInt64 idx : idxs.getData())
{
if (file_progress_callback)
file_progress_callback(FileProgress(0, temp_buffer[idx].info->size));
buffer.emplace_back(std::move(temp_buffer[idx]));
}
VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, query, virtual_columns, getContext(), filter_ast);
}
else
buffer = std::move(temp_buffer);
if (file_progress_callback)
{
buffer = std::move(temp_buffer);
if (file_progress_callback)
{
for (const auto & [_, info] : buffer)
file_progress_callback(FileProgress(0, info->size));
}
for (const auto & [_, info] : buffer)
file_progress_callback(FileProgress(0, info->size));
}
/// Set iterator only after the whole batch is processed
@ -331,20 +305,6 @@ private:
read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
}
void createFilterAST(const String & any_key)
{
if (!query || !virtual_header)
return;
/// Create a virtual block with one row to construct filter
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
auto block = virtual_header.cloneEmpty();
addPathToVirtualColumns(block, fs::path(globbed_uri.bucket) / any_key, 0);
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
}
std::future<ListObjectsOutcome> listObjectsAsync()
{
return list_objects_scheduler([this]
@ -368,7 +328,7 @@ private:
std::unique_ptr<S3::Client> client;
S3::URI globbed_uri;
ASTPtr query;
Block virtual_header;
NamesAndTypesList virtual_columns;
bool is_initialized{false};
ASTPtr filter_ast;
std::unique_ptr<re2::RE2> matcher;
@ -389,12 +349,12 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns_,
ContextPtr context,
KeysWithInfo * read_keys_,
const S3Settings::RequestSettings & request_settings_,
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, read_keys_, request_settings_, file_progress_callback_))
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_))
{
}
@ -413,7 +373,7 @@ public:
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query_,
const Block & virtual_header_,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
std::function<void(FileProgress)> file_progress_callback_)
@ -424,37 +384,19 @@ public:
, bucket(bucket_)
, request_settings(request_settings_)
, query(query_)
, virtual_header(virtual_header_)
, virtual_columns(virtual_columns_)
, file_progress_callback(file_progress_callback_)
{
/// Create a virtual block with one row to construct filter
if (query && virtual_header && !keys.empty())
auto filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getContext());
if (filter_ast)
{
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(bucket) / key);
auto block = virtual_header.cloneEmpty();
addPathToVirtualColumns(block, fs::path(bucket) / keys.front(), 0);
ASTPtr filter_ast;
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
if (filter_ast)
{
block = virtual_header.cloneEmpty();
for (size_t i = 0; i < keys.size(); ++i)
addPathToVirtualColumns(block, fs::path(bucket) / keys[i], i);
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
Strings filtered_keys;
filtered_keys.reserve(block.rows());
for (UInt64 idx : idxs.getData())
filtered_keys.emplace_back(std::move(keys[idx]));
keys = std::move(filtered_keys);
}
VirtualColumnUtils::filterByPathOrFile(keys, paths, query, virtual_columns, getContext(), filter_ast);
}
if (read_keys_)
@ -488,7 +430,7 @@ private:
String bucket;
S3Settings::RequestSettings request_settings;
ASTPtr query;
Block virtual_header;
NamesAndTypesList virtual_columns;
std::function<void(FileProgress)> file_progress_callback;
};
@ -499,13 +441,13 @@ StorageS3Source::KeysIterator::KeysIterator(
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns_,
ContextPtr context,
KeysWithInfo * read_keys,
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
client_, version_id_, keys_, bucket_, request_settings_,
query, virtual_header, context, read_keys, file_progress_callback_))
query, virtual_columns_, context, read_keys, file_progress_callback_))
{
}
@ -595,6 +537,8 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return ReaderHolder{key_with_info.key, bucket, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
}
@ -696,22 +640,7 @@ Chunk StorageS3Source::generate()
UInt64 num_rows = chunk.getNumRows();
size_t chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
const auto & file_path = reader.getPath();
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath());
return chunk;
}
@ -956,14 +885,7 @@ StorageS3::StorageS3(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
@ -971,7 +893,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys,
std::function<void(FileProgress)> file_progress_callback)
{
@ -983,7 +905,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
{
/// Iterate through disclosed globs and make a source for each file
return std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*configuration.client, configuration.url, query, virtual_block,
*configuration.client, configuration.url, query, virtual_columns,
local_context, read_keys, configuration.request_settings, file_progress_callback);
}
else
@ -991,7 +913,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
return std::make_shared<StorageS3Source::KeysIterator>(
*configuration.client, configuration.url.version_id, configuration.keys,
configuration.url.bucket, configuration.request_settings, query,
virtual_block, local_context, read_keys, file_progress_callback);
virtual_columns, local_context, read_keys, file_progress_callback);
}
}
@ -1027,7 +949,7 @@ Pipe StorageS3::read(
Pipes pipes;
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
query_configuration, distributed_processing, local_context, query_info.query, virtual_block, nullptr, local_context->getFileProgressCallback());
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());

View File

@ -11,17 +11,18 @@
#include <Storages/IStorage.h>
#include <Storages/StorageS3Settings.h>
#include <Processors/ISource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/URI.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/CompressionMethod.h>
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/prepareReadingFromFormat.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <IO/SeekableReadBuffer.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Processors/Formats/IInputFormat.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Poco/URI.h>
namespace Aws::S3
{
@ -68,7 +69,7 @@ public:
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys_ = nullptr,
const S3Settings::RequestSettings & request_settings_ = {},
@ -92,7 +93,7 @@ public:
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
ASTPtr query,
const Block & virtual_header,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
@ -333,7 +334,6 @@ private:
Configuration configuration;
std::mutex configuration_update_mutex;
NamesAndTypesList virtual_columns;
Block virtual_block;
String name;
const bool distributed_processing;
@ -347,7 +347,7 @@ private:
bool distributed_processing,
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const NamesAndTypesList & virtual_columns,
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback = {});

View File

@ -16,7 +16,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <TableFunctions/TableFunctionS3Cluster.h>
@ -61,14 +61,7 @@ StorageS3Cluster::StorageS3Cluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
void StorageS3Cluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
@ -88,7 +81,7 @@ void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context)
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, query, virtual_block, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
*s3_configuration.client, s3_configuration.url, query, virtual_columns, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next().key; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}

View File

@ -38,6 +38,8 @@ public:
bool supportsSubcolumns() const override { return true; }
bool supportsTrivialCountOptimization() const override { return true; }
protected:
void updateConfigurationIfChanged(ContextPtr local_context);
@ -48,7 +50,6 @@ private:
StorageS3::Configuration s3_configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
};

View File

@ -2,6 +2,7 @@
#include <Storages/PartitionedSink.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/VirtualColumnUtils.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/threadPoolCallbackRunner.h>
@ -27,6 +28,7 @@
#include <Common/ThreadStatus.h>
#include <Common/parseRemoteDescription.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/ProfileEvents.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
@ -38,6 +40,10 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace ProfileEvents
{
extern const Event EngineFileLikeReadFiles;
}
namespace DB
{
@ -125,6 +131,8 @@ IStorageURLBase::IStorageURLBase(
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
@ -158,9 +166,21 @@ namespace
class StorageURLSource::DisclosedGlobIterator::Impl
{
public:
Impl(const String & uri, size_t max_addresses)
Impl(const String & uri_, size_t max_addresses, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
uris = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
uris = parseRemoteDescription(uri_, 0, uri_.size(), ',', max_addresses);
auto filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, context);
if (filter_ast)
{
std::vector<String> paths;
paths.reserve(uris.size());
for (const auto & uri : uris)
paths.push_back(Poco::URI(uri).getPath());
VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, context, filter_ast);
}
}
String next()
@ -182,8 +202,8 @@ private:
std::atomic_size_t index = 0;
};
StorageURLSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, size_t max_addresses)
: pimpl(std::make_shared<StorageURLSource::DisclosedGlobIterator::Impl>(uri, max_addresses)) {}
StorageURLSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, size_t max_addresses, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
: pimpl(std::make_shared<StorageURLSource::DisclosedGlobIterator::Impl>(uri, max_addresses, query, virtual_columns, context)) {}
String StorageURLSource::DisclosedGlobIterator::next()
{
@ -270,7 +290,6 @@ StorageURLSource::StorageURLSource(
if (auto file_progress_callback = context->getFileProgressCallback())
{
size_t file_size = tryGetFileSizeFromReadBuffer(*read_buf).value_or(0);
LOG_DEBUG(&Poco::Logger::get("URL"), "Send file size {}", file_size);
file_progress_callback(FileProgress(0, file_size));
}
@ -307,6 +326,8 @@ StorageURLSource::StorageURLSource(
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return true;
};
}
@ -331,23 +352,7 @@ Chunk StorageURLSource::generate()
UInt64 num_rows = chunk.getNumRows();
size_t chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
const String & path{curr_uri.getPath()};
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, curr_uri.getPath());
return chunk;
}
@ -723,7 +728,7 @@ Pipe IStorageURLBase::read(
else if (is_url_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, max_addresses);
auto glob_iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, max_addresses, query_info.query, virtual_columns, local_context);
iterator_wrapper = std::make_shared<StorageURLSource::IteratorWrapper>([glob_iterator, max_addresses]()
{
String next_uri = glob_iterator->next();
@ -865,9 +870,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
NamesAndTypesList IStorageURLBase::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
return virtual_columns;
}
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)

View File

@ -1,17 +1,17 @@
#pragma once
#include <Poco/URI.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/ISource.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Processors/ISource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/IStorage.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Poco/URI.h>
namespace DB
@ -88,6 +88,8 @@ protected:
ASTPtr partition_by;
bool distributed_processing;
NamesAndTypesList virtual_columns;
virtual std::string getReadMethod() const;
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(
@ -146,7 +148,8 @@ public:
class DisclosedGlobIterator
{
public:
DisclosedGlobIterator(const String & uri_, size_t max_addresses);
DisclosedGlobIterator(const String & uri_, size_t max_addresses, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
String next();
size_t size();
private:

View File

@ -19,6 +19,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageURL.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/TableFunctionURLCluster.h>
@ -67,6 +68,8 @@ StorageURLCluster::StorageURLCluster(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
}
void StorageURLCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
@ -78,18 +81,11 @@ void StorageURLCluster::addColumnsStructureToQuery(ASTPtr & query, const String
TableFunctionURLCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(ASTPtr, const ContextPtr & context) const
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, context->getSettingsRef().glob_expansion_max_elements);
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(uri, context->getSettingsRef().glob_expansion_max_elements, query, virtual_columns, context);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
NamesAndTypesList StorageURLCluster::getVirtuals() const
{
return NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
}

View File

@ -32,7 +32,7 @@ public:
std::string getName() const override { return "URLCluster"; }
NamesAndTypesList getVirtuals() const override;
NamesAndTypesList getVirtuals() const override { return virtual_columns; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
@ -44,6 +44,7 @@ private:
String uri;
String format_name;
String compression_method;
NamesAndTypesList virtual_columns;
};

View File

@ -20,6 +20,10 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
@ -266,6 +270,89 @@ void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr contex
}
}
NamesAndTypesList getPathAndFileVirtualsForStorage(NamesAndTypesList storage_columns)
{
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
default_virtuals.sort();
storage_columns.sort();
NamesAndTypesList result_virtuals;
std::set_difference(
default_virtuals.begin(), default_virtuals.end(), storage_columns.begin(), storage_columns.end(),
std::back_inserter(result_virtuals),
[](const NameAndTypePair & lhs, const NameAndTypePair & rhs){ return lhs.name < rhs.name; });
return result_virtuals;
}
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
if (block.has("_file"))
{
auto pos = path.find_last_of('/');
assert(pos != std::string::npos);
auto file = path.substr(pos + 1);
block.getByName("_file").column->assumeMutableRef().insert(file);
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
if (!query || virtual_columns.empty())
return {};
Block block;
for (const auto & column : virtual_columns)
block.insert({column.type->createColumn(), column.type, column.name});
/// Create a block with one row to construct filter
/// Append "idx" column as the filter result
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
addPathAndFileToVirtualColumns(block, "path/file", 0);
ASTPtr filter_ast;
prepareFilterBlockWithQuery(query, context, block, filter_ast);
return filter_ast;
}
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context, ASTPtr filter_ast)
{
Block block;
for (const auto & column : virtual_columns)
block.insert({column.type->createColumn(), column.type, column.name});
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i);
filterBlockWithQuery(query, block, context, filter_ast);
return block.getByName("_idx").column;
}
void addRequestedPathAndFileVirtualsToChunk(Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path)
{
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), path));
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = path.find_last_of('/');
auto file_name = path.substr(last_slash_pos + 1);
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), file_name));
}
}
}
}
}

View File

@ -46,6 +46,28 @@ auto extractSingleValueFromBlock(const Block & block, const String & name)
return res;
}
NamesAndTypesList getPathAndFileVirtualsForStorage(NamesAndTypesList storage_columns);
ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context, ASTPtr filter_ast);
template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context, ASTPtr filter_ast)
{
auto indexes_column = getFilterByPathAndFileIndexes(paths, query, virtual_columns, context, filter_ast);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size())
return;
std::vector<T> filtered_sources;
filtered_sources.reserve(indexes.size());
for (auto index : indexes)
filtered_sources.emplace_back(std::move(sources[index]));
sources = std::move(filtered_sources);
}
void addRequestedPathAndFileVirtualsToChunk(Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path);
}
}

View File

@ -1,22 +0,0 @@
#include "getVirtualsForStorage.h"
namespace DB
{
NamesAndTypesList getVirtualsForStorage(const NamesAndTypesList & storage_columns_, const NamesAndTypesList & default_virtuals_)
{
auto default_virtuals = default_virtuals_;
auto storage_columns = storage_columns_;
default_virtuals.sort();
storage_columns.sort();
NamesAndTypesList result_virtuals;
std::set_difference(
default_virtuals.begin(), default_virtuals.end(), storage_columns.begin(), storage_columns.end(),
std::back_inserter(result_virtuals),
[](const NameAndTypePair & lhs, const NameAndTypePair & rhs){ return lhs.name < rhs.name; });
return result_virtuals;
}
}

View File

@ -1,9 +0,0 @@
#pragma once
#include <Core/NamesAndTypes.h>
namespace DB
{
NamesAndTypesList getVirtualsForStorage(const NamesAndTypesList & storage_columns_, const NamesAndTypesList & default_virtuals_);
}

View File

@ -709,3 +709,33 @@ def test_function_signatures(cluster):
# " - storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure\n"
query_10 = f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'CSV', 'auto', 'column1 UInt32')"
assert azure_query(node, query_10) == "1\n2\n3\n"
def test_filtering_by_file_or_path(cluster):
node = cluster.instances["node"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter1.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter2.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter3.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3",
)
node.query(
f"select count() from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter*.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') where _file = 'test_filter1.tsv'"
)
node.query("SYSTEM FLUSH LOGS")
result = node.query(
f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query like '%select%azure%test_filter%' AND type='QueryFinish'"
)
assert int(result) == 1

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "1" > $CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv
echo "2" > $CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv
echo "3" > $CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.data{1,2,3}.tsv', auto, 'x UInt64') where _file like '%data1%' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.data{1,2,3}.tsv', auto, 'x UInt64') where _path like '%data1%' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"
rm $CLICKHOUSE_TEST_UNIQUE_NAME.data*

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from s3('http://localhost:11111/test/{a,b,c}.tsv', auto, 'x UInt64, y UInt64, z UInt64') where _file = 'a.tsv' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from s3('http://localhost:11111/test/{a,b,c}.tsv', auto, 'x UInt64, y UInt64, z UInt64') where _path = 'test/a.tsv' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from url('http://localhost:11111/test/{a,b,c}.tsv', auto, 'x UInt64, y UInt64, z UInt64') where _file = 'a.tsv' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from url('http://localhost:11111/test/{a,b,c}.tsv', auto, 'x UInt64, y UInt64, z UInt64') where _path = '/test/a.tsv' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv') select 1 settings hdfs_truncate_on_insert=1;"
$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv') select 2 settings hdfs_truncate_on_insert=1;"
$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv') select 3 settings hdfs_truncate_on_insert=1;"
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') where _file like '%data1%' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"
$CLICKHOUSE_LOCAL --print-profile-events -q "select * from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') where _path like '%data1%' format Null" 2>&1 | grep -F -c "EngineFileLikeReadFiles: 1"