From 007353a2dda0f111e4a85c216c8e99eb76ee8d7b Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 22 Nov 2023 18:12:36 +0000 Subject: [PATCH 1/7] Add _size virtual column to s3/file/hdfs/url/azureBlobStorage engines --- src/Storages/HDFS/StorageHDFS.cpp | 5 +- src/Storages/HDFS/StorageHDFS.h | 1 + src/Storages/HDFS/StorageHDFSCluster.cpp | 2 +- src/Storages/S3Queue/S3QueueSource.cpp | 2 +- src/Storages/S3Queue/StorageS3Queue.cpp | 2 +- src/Storages/StorageAzureBlob.cpp | 8 +- src/Storages/StorageAzureBlob.h | 1 + src/Storages/StorageAzureBlobCluster.cpp | 2 +- src/Storages/StorageFile.cpp | 9 +- src/Storages/StorageS3.cpp | 695 +++++++++--------- src/Storages/StorageS3.h | 1 + src/Storages/StorageS3Cluster.cpp | 2 +- src/Storages/StorageURL.cpp | 10 +- src/Storages/StorageURL.h | 1 + src/Storages/StorageURLCluster.cpp | 2 +- src/Storages/VirtualColumnUtils.cpp | 16 +- src/Storages/VirtualColumnUtils.h | 6 +- ..._file_engine_size_virtual_column.reference | 12 + .../02921_file_engine_size_virtual_column.sh | 23 + ...rl_s3_engine_size_virtual_column.reference | 12 + ...02922_url_s3_engine_size_virtual_column.sh | 13 + ..._hdfs_engine_size_virtual_column.reference | 6 + .../02923_hdfs_engine_size_virtual_column.sh | 15 + 23 files changed, 489 insertions(+), 357 deletions(-) create mode 100644 tests/queries/0_stateless/02921_file_engine_size_virtual_column.reference create mode 100755 tests/queries/0_stateless/02921_file_engine_size_virtual_column.sh create mode 100644 tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.reference create mode 100755 tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh create mode 100644 tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.reference create mode 100755 tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 7d845dac57d..a911862aa1c 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -238,7 +238,7 @@ StorageHDFS::StorageHDFS( storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } namespace @@ -540,6 +540,7 @@ bool HDFSSource::initialize() } current_path = path_with_info.path; + current_file_size = path_with_info.info ? std::optional(path_with_info.info->size) : std::nullopt; QueryPipelineBuilder builder; std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(path_with_info) : std::nullopt; @@ -613,7 +614,7 @@ Chunk HDFSSource::generate() if (input_format) chunk_size = input_format->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); - VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, current_path); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, current_path, current_file_size); return chunk; } diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index ffbf4e93ff9..9e53f1bd87c 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -181,6 +181,7 @@ private: std::unique_ptr pipeline; std::unique_ptr reader; String current_path; + std::optional current_file_size; /// Recreate ReadBuffer and PullingPipelineExecutor for each file. bool initialize(); diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp index 83655b06cc8..bff22936e95 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.cpp +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -66,7 +66,7 @@ StorageHDFSCluster::StorageHDFSCluster( storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } void StorageHDFSCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 1afd17edbe1..27bec039f96 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -210,7 +210,7 @@ Chunk StorageS3QueueSource::generate() file_status->processed_rows += chunk.getNumRows(); processed_rows_from_file += chunk.getNumRows(); - VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath()); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath(), reader.getKeyWithInfo().info->size); return chunk; } } diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 99699aab709..c64bb32d7a7 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -149,7 +149,7 @@ StorageS3Queue::StorageS3Queue( createOrCheckMetadata(storage_metadata); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index b43f25b0fff..fd4678ade59 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -494,7 +494,7 @@ StorageAzureBlob::StorageAzureBlob( for (const auto & key : configuration.blobs_paths) objects.emplace_back(key); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) @@ -1011,7 +1011,11 @@ Chunk StorageAzureBlobSource::generate() if (const auto * input_format = reader.getInputFormat()) chunk_size = input_format->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); - VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, fs::path(container) / reader.getRelativePath()); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk( + chunk, + requested_virtual_columns, + fs::path(container) / reader.getRelativePath(), + reader.getRelativePathWithMetadata().metadata.size_bytes); return chunk; } diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index b97dee0caed..a80abce99f1 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -331,6 +331,7 @@ private: const String & getRelativePath() const { return relative_path_with_metadata.relative_path; } const RelativePathWithMetadata & getRelativePathWithMetadata() const { return relative_path_with_metadata; } const IInputFormat * getInputFormat() const { return dynamic_cast(source.get()); } + ReadBuffer & getReadBuffer() const { return *read_buf; } private: RelativePathWithMetadata relative_path_with_metadata; diff --git a/src/Storages/StorageAzureBlobCluster.cpp b/src/Storages/StorageAzureBlobCluster.cpp index 7592a6e6acc..b8f95458379 100644 --- a/src/Storages/StorageAzureBlobCluster.cpp +++ b/src/Storages/StorageAzureBlobCluster.cpp @@ -57,7 +57,7 @@ StorageAzureBlobCluster::StorageAzureBlobCluster( storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } void StorageAzureBlobCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index dd527a50794..9769ea09d80 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -859,7 +859,7 @@ void StorageFile::setStorageMetadata(CommonArguments args) storage_metadata.setComment(args.comment); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } @@ -1149,6 +1149,7 @@ public: chassert(file_enumerator); current_path = fmt::format("{}::{}", archive_reader->getPath(), *filename_override); + current_file_size = file_enumerator->getFileInfo().uncompressed_size; if (need_only_count && tryGetCountFromCache(current_archive_stat)) continue; @@ -1177,6 +1178,7 @@ public: { struct stat file_stat; file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName()); + current_file_size = file_stat.st_size; if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0) continue; @@ -1243,8 +1245,8 @@ public: progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); /// Enrich with virtual columns. - VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk( - chunk, requested_virtual_columns, current_path, filename_override.has_value() ? &filename_override.value() : nullptr); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk( + chunk, requested_virtual_columns, current_path, current_file_size, filename_override.has_value() ? &filename_override.value() : nullptr); return chunk; } @@ -1305,6 +1307,7 @@ private: StorageSnapshotPtr storage_snapshot; FilesIteratorPtr files_iterator; String current_path; + std::optional current_file_size; struct stat current_archive_stat; std::optional filename_override; Block sample_block; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index bdbba5abd96..a00f28a733f 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1,90 +1,90 @@ -#include "config.h" #include #include "Parsers/ASTCreateQuery.h" +#include "config.h" #if USE_AWS_S3 -#include +# include -#include -#include -#include -#include +# include +# include +# include +# include -#include -#include +# include +# include -#include -#include +# include +# include -#include -#include -#include -#include -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include +# include +# include -#include -#include -#include +# include +# include +# include -#include -#include +# include +# include -#include -#include +# include +# include -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include -#include +# include -#include +# include -#include +# include -#include -#include -#include -#include +# include +# include +# include +# include -#include -#include -#include -#include +# include +# include +# include +# include -#include +# include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant" -#endif -#include -#ifdef __clang__ -# pragma clang diagnostic pop -#endif +# ifdef __clang__ +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant" +# endif +# include +# ifdef __clang__ +# pragma clang diagnostic pop +# endif namespace fs = std::filesystem; namespace CurrentMetrics { - extern const Metric StorageS3Threads; - extern const Metric StorageS3ThreadsActive; - extern const Metric StorageS3ThreadsScheduled; +extern const Metric StorageS3Threads; +extern const Metric StorageS3ThreadsActive; +extern const Metric StorageS3ThreadsScheduled; } namespace ProfileEvents { - extern const Event S3DeleteObjects; - extern const Event S3ListObjects; - extern const Event EngineFileLikeReadFiles; +extern const Event S3DeleteObjects; +extern const Event S3ListObjects; +extern const Event EngineFileLikeReadFiles; } namespace DB @@ -93,37 +93,36 @@ namespace DB static const std::unordered_set required_configuration_keys = { "url", }; -static const std::unordered_set optional_configuration_keys = { - "format", - "compression", - "compression_method", - "structure", - "access_key_id", - "secret_access_key", - "filename", - "use_environment_credentials", - "max_single_read_retries", - "min_upload_part_size", - "upload_part_size_multiply_factor", - "upload_part_size_multiply_parts_count_threshold", - "max_single_part_upload_size", - "max_connections", - "expiration_window_seconds", - "no_sign_request" -}; +static const std::unordered_set optional_configuration_keys + = {"format", + "compression", + "compression_method", + "structure", + "access_key_id", + "secret_access_key", + "filename", + "use_environment_credentials", + "max_single_read_retries", + "min_upload_part_size", + "upload_part_size_multiply_factor", + "upload_part_size_multiply_parts_count_threshold", + "max_single_part_upload_size", + "max_connections", + "expiration_window_seconds", + "no_sign_request"}; namespace ErrorCodes { - extern const int CANNOT_PARSE_TEXT; - extern const int BAD_ARGUMENTS; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int S3_ERROR; - extern const int UNEXPECTED_EXPRESSION; - extern const int DATABASE_ACCESS_DENIED; - extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; - extern const int NOT_IMPLEMENTED; - extern const int CANNOT_COMPILE_REGEXP; - extern const int FILE_DOESNT_EXIST; +extern const int CANNOT_PARSE_TEXT; +extern const int BAD_ARGUMENTS; +extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +extern const int S3_ERROR; +extern const int UNEXPECTED_EXPRESSION; +extern const int DATABASE_ACCESS_DENIED; +extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; +extern const int NOT_IMPLEMENTED; +extern const int CANNOT_COMPILE_REGEXP; +extern const int FILE_DOESNT_EXIST; } class IOutputFormat; @@ -148,7 +147,8 @@ public: , virtual_columns(virtual_columns_) , read_keys(read_keys_) , request_settings(request_settings_) - , list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) + , list_objects_pool( + CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) , list_objects_scheduler(threadPoolCallbackRunner(list_objects_pool, "ListObjects")) , file_progress_callback(file_progress_callback_) { @@ -174,8 +174,8 @@ public: matcher = std::make_unique(makeRegexpPatternFromGlobs(globbed_uri.key)); if (!matcher->ok()) - throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, - "Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error()); + throw Exception( + ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error()); recursive = globbed_uri.key == "/**" ? true : false; fillInternalBufferAssumeLocked(); @@ -187,15 +187,9 @@ public: return nextAssumeLocked(); } - size_t objectsCount() - { - return buffer.size(); - } + size_t objectsCount() { return buffer.size(); } - ~Impl() - { - list_objects_pool.wait(); - } + ~Impl() { list_objects_pool.wait(); } private: using ListObjectsOutcome = Aws::S3::Model::ListObjectsV2Outcome; @@ -251,9 +245,13 @@ private: if (!outcome.IsSuccess()) { - throw S3Exception(outcome.GetError().GetErrorType(), "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}", - quoteString(request.GetBucket()), quoteString(request.GetPrefix()), - backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage())); + throw S3Exception( + outcome.GetError().GetErrorType(), + "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}", + quoteString(request.GetBucket()), + quoteString(request.GetPrefix()), + backQuote(outcome.GetError().GetExceptionName()), + quoteString(outcome.GetError().GetMessage())); } const auto & result_batch = outcome.GetResult().GetContents(); @@ -280,8 +278,7 @@ private: String key = row.GetKey(); if (recursive || re2::RE2::FullMatch(key, *matcher)) { - S3::ObjectInfo info = - { + S3::ObjectInfo info = { .size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000, }; @@ -298,7 +295,8 @@ private: if (!is_initialized) { - filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext()); + filter_ast = VirtualColumnUtils::createPathAndFileFilterAst( + query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext()); is_initialized = true; } @@ -315,10 +313,8 @@ private: buffer = std::move(temp_buffer); if (file_progress_callback) - { for (const auto & key_with_info : buffer) file_progress_callback(FileProgress(0, key_with_info->info->size)); - } /// Set iterator only after the whole batch is processed buffer_iter = buffer.begin(); @@ -329,17 +325,19 @@ private: std::future listObjectsAsync() { - return list_objects_scheduler([this] - { - ProfileEvents::increment(ProfileEvents::S3ListObjects); - auto outcome = client->ListObjectsV2(request); + return list_objects_scheduler( + [this] + { + ProfileEvents::increment(ProfileEvents::S3ListObjects); + auto outcome = client->ListObjectsV2(request); - /// Outcome failure will be handled on the caller side. - if (outcome.IsSuccess()) - request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + /// Outcome failure will be handled on the caller side. + if (outcome.IsSuccess()) + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - return outcome; - }, Priority{}); + return outcome; + }, + Priority{}); } std::mutex mutex; @@ -376,7 +374,8 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( KeysWithInfo * read_keys_, const S3Settings::RequestSettings & request_settings_, std::function file_progress_callback_) - : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_)) + : pimpl(std::make_shared( + client_, globbed_uri_, query, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_)) { } @@ -429,10 +428,8 @@ public: } if (read_keys_) - { for (const auto & key : keys) read_keys_->push_back(std::make_shared(key)); - } } KeyWithInfoPtr next() @@ -451,10 +448,7 @@ public: return std::make_shared(key, info); } - size_t objectsCount() - { - return keys.size(); - } + size_t objectsCount() { return keys.size(); } private: Strings keys; @@ -480,8 +474,7 @@ StorageS3Source::KeysIterator::KeysIterator( KeysWithInfo * read_keys, std::function file_progress_callback_) : pimpl(std::make_shared( - client_, version_id_, keys_, bucket_, request_settings_, - query, virtual_columns_, context, read_keys, file_progress_callback_)) + client_, version_id_, keys_, bucket_, request_settings_, query, virtual_columns_, context, read_keys, file_progress_callback_)) { } @@ -495,12 +488,13 @@ size_t StorageS3Source::KeysIterator::estimatedKeysCount() return pimpl->objectsCount(); } -StorageS3Source::ReadTaskIterator::ReadTaskIterator( - const DB::ReadTaskCallback & callback_, - size_t max_threads_count) - : callback(callback_) +StorageS3Source::ReadTaskIterator::ReadTaskIterator(const DB::ReadTaskCallback & callback_, size_t max_threads_count) : callback(callback_) { - ThreadPool pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, max_threads_count); + ThreadPool pool( + CurrentMetrics::StorageS3Threads, + CurrentMetrics::StorageS3ThreadsActive, + CurrentMetrics::StorageS3ThreadsScheduled, + max_threads_count); auto pool_scheduler = threadPoolCallbackRunner(pool, "S3ReadTaskItr"); std::vector> keys; @@ -565,7 +559,8 @@ StorageS3Source::StorageS3Source( , file_iterator(file_iterator_) , max_parsing_threads(max_parsing_threads_) , need_only_count(need_only_count_) - , create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) + , create_reader_pool( + CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) , create_reader_scheduler(threadPoolCallbackRunner(create_reader_pool, "CreateS3Reader")) { } @@ -592,13 +587,14 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() if (!key_with_info->info) key_with_info->info = S3::getObjectInfo(*client, bucket, key_with_info->key, version_id, request_settings); - } - while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0); + } while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0); QueryPipelineBuilder builder; std::shared_ptr source; std::unique_ptr read_buf; - std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(*key_with_info) : std::nullopt; + std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files + ? tryGetNumRowsFromCache(*key_with_info) + : std::nullopt; if (num_rows_from_cache) { /// We should not return single chunk with all number of rows, @@ -647,10 +643,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() /// Add ExtractColumnsTransform to extract requested columns/subcolumns /// from chunk read by IInputFormat. - builder.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, requested_columns); - }); + builder.addSimpleTransform([&](const Block & header) { return std::make_shared(header, requested_columns); }); auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); auto current_reader = std::make_unique(*pipeline); @@ -682,18 +675,25 @@ std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & k } return std::make_unique( - client, bucket, key, version_id, request_settings, read_settings, - /*use_external_buffer*/ false, /*offset_*/ 0, /*read_until_position_*/ 0, - /*restricted_seek_*/ false, object_size); + client, + bucket, + key, + version_id, + request_settings, + read_settings, + /*use_external_buffer*/ false, + /*offset_*/ 0, + /*read_until_position_*/ 0, + /*restricted_seek_*/ false, + object_size); } -std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( - const String & key, const ReadSettings & read_settings, size_t object_size) +std::unique_ptr +StorageS3Source::createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size) { auto context = getContext(); auto read_buffer_creator = - [this, read_settings, object_size] - (const std::string & path, size_t read_until_position) -> std::unique_ptr + [this, read_settings, object_size](const std::string & path, size_t read_until_position) -> std::unique_ptr { return std::make_unique( client, @@ -702,10 +702,10 @@ std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( version_id, request_settings, read_settings, - /* use_external_buffer */true, - /* offset */0, + /* use_external_buffer */ true, + /* offset */ 0, read_until_position, - /* restricted_seek */true, + /* restricted_seek */ true, object_size); }; @@ -713,7 +713,8 @@ std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( std::move(read_buffer_creator), StoredObjects{StoredObject{key, object_size}}, read_settings, - /* cache_log */nullptr, /* use_external_buffer */true); + /* cache_log */ nullptr, + /* use_external_buffer */ true); auto modified_settings{read_settings}; /// FIXME: Changing this setting to default value breaks something around parquet reading @@ -721,8 +722,7 @@ std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); auto async_reader = std::make_unique( - std::move(s3_impl), pool_reader, modified_settings, - context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); + std::move(s3_impl), pool_reader, modified_settings, context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); async_reader->setReadUntilEnd(); if (read_settings.remote_fs_prefetch) @@ -763,7 +763,7 @@ Chunk StorageS3Source::generate() if (const auto * input_format = reader.getInputFormat()) chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); - VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath()); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath(), reader.getFileSize()); return chunk; } @@ -798,10 +798,7 @@ std::optional StorageS3Source::tryGetNumRowsFromCache(const KeyWithInfo { String source = fs::path(url_host_and_port) / bucket / key_with_info.key; auto cache_key = getKeyForSchemaCache(source, format, format_settings, getContext()); - auto get_last_mod_time = [&]() -> std::optional - { - return key_with_info.info->last_modification_time; - }; + auto get_last_mod_time = [&]() -> std::optional { return key_with_info.info->last_modification_time; }; return StorageS3::getSchemaCache(getContext()).tryGetNumRows(cache_key, get_last_mod_time); } @@ -818,9 +815,7 @@ public: const StorageS3::Configuration & configuration_, const String & bucket, const String & key) - : SinkToStorage(sample_block_) - , sample_block(sample_block_) - , format_settings(format_settings_) + : SinkToStorage(sample_block_), sample_block(sample_block_), format_settings(format_settings_) { write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique( @@ -834,8 +829,7 @@ public: context->getWriteSettings()), compression_method, 3); - writer - = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings); + writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings); } String getName() const override { return "StorageS3Sink"; } @@ -944,15 +938,7 @@ public: validateKey(partition_key); return std::make_shared( - format, - sample_block, - context, - format_settings, - compression_method, - configuration, - partition_bucket, - partition_key - ); + format, sample_block, context, format_settings, compression_method, configuration, partition_bucket, partition_key); } private: @@ -1027,7 +1013,8 @@ StorageS3::StorageS3( { /// We don't allow special columns in S3 storage. if (!columns_.hasOnlyOrdinary()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL"); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL"); storage_metadata.setColumns(columns_); } @@ -1035,7 +1022,7 @@ StorageS3::StorageS3( storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } std::shared_ptr StorageS3::createFileIterator( @@ -1049,21 +1036,35 @@ std::shared_ptr StorageS3::createFileIterator( { if (distributed_processing) { - return std::make_shared(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); + return std::make_shared( + local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); } else if (configuration.withGlobs()) { /// Iterate through disclosed globs and make a source for each file return std::make_shared( - *configuration.client, configuration.url, query, virtual_columns, - local_context, read_keys, configuration.request_settings, file_progress_callback); + *configuration.client, + configuration.url, + query, + virtual_columns, + local_context, + read_keys, + configuration.request_settings, + file_progress_callback); } else { return std::make_shared( - *configuration.client, configuration.url.version_id, configuration.keys, - configuration.url.bucket, configuration.request_settings, query, - virtual_columns, local_context, read_keys, file_progress_callback); + *configuration.client, + configuration.url.version_id, + configuration.keys, + configuration.url.bucket, + configuration.request_settings, + query, + virtual_columns, + local_context, + read_keys, + file_progress_callback); } } @@ -1099,7 +1100,13 @@ Pipe StorageS3::read( Pipes pipes; std::shared_ptr iterator_wrapper = createFileIterator( - query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback()); + query_configuration, + distributed_processing, + local_context, + query_info.query, + virtual_columns, + nullptr, + local_context->getFileProgressCallback()); size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount(); if (estimated_keys_count > 1) @@ -1108,7 +1115,8 @@ Pipe StorageS3::read( /// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case. num_streams = 1; - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); + auto read_from_format_info + = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; @@ -1141,7 +1149,8 @@ Pipe StorageS3::read( return Pipe::unitePipes(std::move(pipes)); } -SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) +SinkToStoragePtr +StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { auto query_configuration = updateConfigurationAndGetCopy(local_context); @@ -1168,12 +1177,20 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr else { if (query_configuration.withGlobs()) - throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, - "S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key); + throw Exception( + ErrorCodes::DATABASE_ACCESS_DENIED, + "S3 key '{}' contains globs, so the table is in readonly mode", + query_configuration.url.key); bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert; - if (!truncate_in_insert && S3::objectExists(*query_configuration.client, query_configuration.url.bucket, query_configuration.keys.back(), query_configuration.url.version_id, query_configuration.request_settings)) + if (!truncate_in_insert + && S3::objectExists( + *query_configuration.client, + query_configuration.url.bucket, + query_configuration.keys.back(), + query_configuration.url.version_id, + query_configuration.request_settings)) { if (local_context->getSettingsRef().s3_create_new_file_on_insert) { @@ -1183,10 +1200,15 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr String new_key; do { - new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); + new_key + = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); ++index; - } - while (S3::objectExists(*query_configuration.client, query_configuration.url.bucket, new_key, query_configuration.url.version_id, query_configuration.request_settings)); + } while (S3::objectExists( + *query_configuration.client, + query_configuration.url.bucket, + new_key, + query_configuration.url.version_id, + query_configuration.request_settings)); query_configuration.keys.push_back(new_key); configuration.keys.push_back(new_key); @@ -1198,7 +1220,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr "Object in bucket {} with key {} already exists. " "If you want to overwrite it, enable setting s3_truncate_on_insert, if you " "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", - query_configuration.url.bucket, query_configuration.keys.back()); + query_configuration.url.bucket, + query_configuration.keys.back()); } } @@ -1345,10 +1368,12 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur configuration.auth_settings.secret_access_key = collection.getOrDefault("secret_access_key", ""); configuration.auth_settings.use_environment_credentials = collection.getOrDefault("use_environment_credentials", 1); configuration.auth_settings.no_sign_request = collection.getOrDefault("no_sign_request", false); - configuration.auth_settings.expiration_window_seconds = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); + configuration.auth_settings.expiration_window_seconds + = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); configuration.format = collection.getOrDefault("format", configuration.format); - configuration.compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); + configuration.compression_method + = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); configuration.structure = collection.getOrDefault("structure", "auto"); configuration.request_settings = S3Settings::RequestSettings(collection); @@ -1378,9 +1403,10 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context /// with optional headers() function if (engine_args.empty() || engine_args.size() > 5) - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, - "Storage S3 requires 1 to 5 arguments: " - "url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]"); + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage S3 requires 1 to 5 arguments: " + "url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]"); auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context); if (header_it != engine_args.end()) @@ -1390,11 +1416,8 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); /// Size -> argument indexes - static std::unordered_map> size_to_engine_args - { - {1, {{}}}, - {5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression_method", 4}}} - }; + static std::unordered_map> size_to_engine_args{ + {1, {{}}}, {5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression_method", 4}}}}; std::unordered_map engine_args_to_idx; bool no_sign_request = false; @@ -1456,13 +1479,16 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context configuration.format = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["format"]], "format"); if (engine_args_to_idx.contains("compression_method")) - configuration.compression_method = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); + configuration.compression_method + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); if (engine_args_to_idx.contains("access_key_id")) - configuration.auth_settings.access_key_id = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); + configuration.auth_settings.access_key_id + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); if (engine_args_to_idx.contains("secret_access_key")) - configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); + configuration.auth_settings.secret_access_key + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); configuration.auth_settings.no_sign_request = no_sign_request; } @@ -1478,105 +1504,109 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context } ColumnsDescription StorageS3::getTableStructureFromData( - const StorageS3::Configuration & configuration, - const std::optional & format_settings, - ContextPtr ctx) + const StorageS3::Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) { return getTableStructureFromDataImpl(configuration, format_settings, ctx); } namespace { - class ReadBufferIterator : public IReadBufferIterator, WithContext +class ReadBufferIterator : public IReadBufferIterator, WithContext +{ +public: + ReadBufferIterator( + std::shared_ptr file_iterator_, + const StorageS3Source::KeysWithInfo & read_keys_, + const StorageS3::Configuration & configuration_, + const std::optional & format_settings_, + const ContextPtr & context_) + : WithContext(context_) + , file_iterator(file_iterator_) + , read_keys(read_keys_) + , configuration(configuration_) + , format_settings(format_settings_) + , prev_read_keys_size(read_keys_.size()) { - public: - ReadBufferIterator( - std::shared_ptr file_iterator_, - const StorageS3Source::KeysWithInfo & read_keys_, - const StorageS3::Configuration & configuration_, - const std::optional & format_settings_, - const ContextPtr & context_) - : WithContext(context_) - , file_iterator(file_iterator_) - , read_keys(read_keys_) - , configuration(configuration_) - , format_settings(format_settings_) - , prev_read_keys_size(read_keys_.size()) - { - } + } - std::unique_ptr next() override + std::unique_ptr next() override + { + while (true) { - while (true) + current_key_with_info = (*file_iterator)(); + + if (!current_key_with_info || current_key_with_info->key.empty()) { - current_key_with_info = (*file_iterator)(); + if (first) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Cannot extract table structure from {} format file, because there are no files with provided path " + "in S3 or all files are empty. You must specify table structure manually", + configuration.format); - if (!current_key_with_info || current_key_with_info->key.empty()) - { - if (first) - throw Exception( - ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, - "Cannot extract table structure from {} format file, because there are no files with provided path " - "in S3 or all files are empty. You must specify table structure manually", - configuration.format); + return nullptr; + } + /// S3 file iterator could get new keys after new iteration, check them in schema cache. + if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) + { + columns_from_cache = StorageS3::tryGetColumnsFromCache( + read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, getContext()); + prev_read_keys_size = read_keys.size(); + if (columns_from_cache) return nullptr; - } + } - /// S3 file iterator could get new keys after new iteration, check them in schema cache. - if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) - { - columns_from_cache = StorageS3::tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, getContext()); - prev_read_keys_size = read_keys.size(); - if (columns_from_cache) - return nullptr; - } + if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) + continue; - if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) - continue; - - int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); - auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); - if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) - { - first = false; - return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max); - } + int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); + auto impl = std::make_unique( + configuration.client, + configuration.url.bucket, + current_key_with_info->key, + configuration.url.version_id, + configuration.request_settings, + getContext()->getReadSettings()); + if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) + { + first = false; + return wrapReadBufferWithCompressionMethod( + std::move(impl), + chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), + zstd_window_log_max); } } + } - std::optional getCachedColumns() override - { - return columns_from_cache; - } + std::optional getCachedColumns() override { return columns_from_cache; } - void setNumRowsToLastFile(size_t num_rows) override - { - if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) - return; + void setNumRowsToLastFile(size_t num_rows) override + { + if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) + return; - String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key; - auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext()); - StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); - } + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) + / configuration.url.bucket / current_key_with_info->key; + auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext()); + StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); + } - private: - std::shared_ptr file_iterator; - const StorageS3Source::KeysWithInfo & read_keys; - const StorageS3::Configuration & configuration; - const std::optional & format_settings; - std::optional columns_from_cache; - StorageS3Source::KeyWithInfoPtr current_key_with_info; - size_t prev_read_keys_size; - bool first = true; - }; +private: + std::shared_ptr file_iterator; + const StorageS3Source::KeysWithInfo & read_keys; + const StorageS3::Configuration & configuration; + const std::optional & format_settings; + std::optional columns_from_cache; + StorageS3Source::KeyWithInfoPtr current_key_with_info; + size_t prev_read_keys_size; + bool first = true; +}; } ColumnsDescription StorageS3::getTableStructureFromDataImpl( - const Configuration & configuration, - const std::optional & format_settings, - ContextPtr ctx) + const Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) { KeysWithInfo read_keys; @@ -1606,60 +1636,60 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( void registerStorageS3Impl(const String & name, StorageFactory & factory) { - factory.registerStorage(name, [](const StorageFactory::Arguments & args) - { - auto & engine_args = args.engine_args; - if (engine_args.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); - - auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext()); - // Use format settings from global server context + settings from - // the SETTINGS clause of the create query. Settings from current - // session and user are ignored. - std::optional format_settings; - if (args.storage_def->settings) + factory.registerStorage( + name, + [](const StorageFactory::Arguments & args) { - FormatFactorySettings user_format_settings; + auto & engine_args = args.engine_args; + if (engine_args.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); - // Apply changed settings from global context, but ignore the - // unknown ones, because we only have the format settings here. - const auto & changes = args.getContext()->getSettingsRef().changes(); - for (const auto & change : changes) + auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext()); + // Use format settings from global server context + settings from + // the SETTINGS clause of the create query. Settings from current + // session and user are ignored. + std::optional format_settings; + if (args.storage_def->settings) { - if (user_format_settings.has(change.name)) - user_format_settings.set(change.name, change.value); + FormatFactorySettings user_format_settings; + + // Apply changed settings from global context, but ignore the + // unknown ones, because we only have the format settings here. + const auto & changes = args.getContext()->getSettingsRef().changes(); + for (const auto & change : changes) + if (user_format_settings.has(change.name)) + user_format_settings.set(change.name, change.value); + + // Apply changes from SETTINGS clause, with validation. + user_format_settings.applyChanges(args.storage_def->settings->changes); + format_settings = getFormatSettings(args.getContext(), user_format_settings); + } + else + { + format_settings = getFormatSettings(args.getContext()); } - // Apply changes from SETTINGS clause, with validation. - user_format_settings.applyChanges(args.storage_def->settings->changes); - format_settings = getFormatSettings(args.getContext(), user_format_settings); - } - else + ASTPtr partition_by; + if (args.storage_def->partition_by) + partition_by = args.storage_def->partition_by->clone(); + + return std::make_shared( + std::move(configuration), + args.getContext(), + args.table_id, + args.columns, + args.constraints, + args.comment, + format_settings, + /* distributed_processing_ */ false, + partition_by); + }, { - format_settings = getFormatSettings(args.getContext()); - } - - ASTPtr partition_by; - if (args.storage_def->partition_by) - partition_by = args.storage_def->partition_by->clone(); - - return std::make_shared( - std::move(configuration), - args.getContext(), - args.table_id, - args.columns, - args.constraints, - args.comment, - format_settings, - /* distributed_processing_ */false, - partition_by); - }, - { - .supports_settings = true, - .supports_sort_order = true, // for partition by - .supports_schema_inference = true, - .source_access_type = AccessType::S3, - }); + .supports_settings = true, + .supports_sort_order = true, // for partition by + .supports_schema_inference = true, + .source_access_type = AccessType::S3, + }); } void registerStorageS3(StorageFactory & factory) @@ -1689,7 +1719,8 @@ bool StorageS3::supportsPartitionBy() const SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx) { - static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + static SchemaCache schema_cache( + ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } @@ -1716,14 +1747,15 @@ std::optional StorageS3::tryGetColumnsFromCache( /// but schema cache will handle this case and won't return columns from cache /// because we can't say that it's valid without last modification time. last_modification_time = S3::getObjectInfo( - *configuration.client, - configuration.url.bucket, - (*it)->key, - configuration.url.version_id, - configuration.request_settings, - /*with_metadata=*/ false, - /*for_disk_s3=*/ false, - /*throw_on_error= */ false).last_modification_time; + *configuration.client, + configuration.url.bucket, + (*it)->key, + configuration.url.version_id, + configuration.request_settings, + /*with_metadata=*/false, + /*for_disk_s3=*/false, + /*throw_on_error= */ false) + .last_modification_time; } return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt; @@ -1748,10 +1780,11 @@ void StorageS3::addColumnsToCache( const std::optional & format_settings, const ContextPtr & ctx) { - auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; + auto host_and_bucket + = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; Strings sources; sources.reserve(keys.size()); - std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; }); + std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem) { return host_and_bucket / elem->key; }); auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); auto & schema_cache = getSchemaCache(ctx); schema_cache.addManyColumns(cache_keys, columns); diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 3f35c578e19..62bbccc86a0 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -231,6 +231,7 @@ private: String getPath() const { return fs::path(bucket) / key_with_info->key; } const String & getFile() const { return key_with_info->key; } const KeyWithInfo & getKeyWithInfo() const { return *key_with_info; } + std::optional getFileSize() const { return key_with_info->info ? std::optional(key_with_info->info->size) : std::nullopt; } const IInputFormat * getInputFormat() const { return dynamic_cast(source.get()); } diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 824dae6bc3e..702b1f14ae7 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -61,7 +61,7 @@ StorageS3Cluster::StorageS3Cluster( storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } void StorageS3Cluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 724574e9515..a485215f72f 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -140,7 +140,7 @@ IStorageURLBase::IStorageURLBase( storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } @@ -308,12 +308,10 @@ StorageURLSource::StorageURLSource( curr_uri = uri_and_buf.first; auto last_mod_time = uri_and_buf.second->tryGetLastModificationTime(); read_buf = std::move(uri_and_buf.second); + current_file_size = tryGetFileSizeFromReadBuffer(*read_buf); if (auto file_progress_callback = getContext()->getFileProgressCallback()) - { - size_t file_size = tryGetFileSizeFromReadBuffer(*read_buf).value_or(0); - file_progress_callback(FileProgress(0, file_size)); - } + file_progress_callback(FileProgress(0, current_file_size.value_or(0))); QueryPipelineBuilder builder; std::optional num_rows_from_cache = std::nullopt; @@ -401,7 +399,7 @@ Chunk StorageURLSource::generate() if (input_format) chunk_size = input_format->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); - VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, curr_uri.getPath()); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, curr_uri.getPath(), current_file_size); return chunk; } diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 6306d16742e..f3758f92468 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -225,6 +225,7 @@ private: Block block_for_format; std::shared_ptr uri_iterator; Poco::URI curr_uri; + std::optional current_file_size; String format; const std::optional & format_settings; HTTPHeaderEntries headers; diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 5c2108bef33..c052e781877 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -69,7 +69,7 @@ StorageURLCluster::StorageURLCluster( storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); - virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); + virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } void StorageURLCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index 219043f25c6..88cbdf045c3 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -350,11 +350,12 @@ void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr contex } } -NamesAndTypesList getPathAndFileVirtualsForStorage(NamesAndTypesList storage_columns) +NamesAndTypesList getPathFileAndSizeVirtualsForStorage(NamesAndTypesList storage_columns) { auto default_virtuals = NamesAndTypesList{ {"_path", std::make_shared(std::make_shared())}, - {"_file", std::make_shared(std::make_shared())}}; + {"_file", std::make_shared(std::make_shared())}, + {"_size", makeNullable(std::make_shared())}}; default_virtuals.sort(); storage_columns.sort(); @@ -420,8 +421,8 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const return block.getByName("_idx").column; } -void addRequestedPathAndFileVirtualsToChunk( - Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, const String * filename) +void addRequestedPathFileAndSizeVirtualsToChunk( + Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, std::optional size, const String * filename) { for (const auto & virtual_column : requested_virtual_columns) { @@ -442,6 +443,13 @@ void addRequestedPathAndFileVirtualsToChunk( chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), filename_from_path)->convertToFullColumnIfConst()); } } + else if (virtual_column.name == "_size") + { + if (size) + chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *size)->convertToFullColumnIfConst()); + else + chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst()); + } } } diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index a21f2b05552..f5d73be5fb8 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -47,7 +47,7 @@ auto extractSingleValueFromBlock(const Block & block, const String & name) return res; } -NamesAndTypesList getPathAndFileVirtualsForStorage(NamesAndTypesList storage_columns); +NamesAndTypesList getPathFileAndSizeVirtualsForStorage(NamesAndTypesList storage_columns); ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const String & path_example, const ContextPtr & context); @@ -68,8 +68,8 @@ void filterByPathOrFile(std::vector & sources, const std::vector & pa sources = std::move(filtered_sources); } -void addRequestedPathAndFileVirtualsToChunk( - Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, const String * filename = nullptr); +void addRequestedPathFileAndSizeVirtualsToChunk( + Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, std::optional size, const String * filename = nullptr); } } diff --git a/tests/queries/0_stateless/02921_file_engine_size_virtual_column.reference b/tests/queries/0_stateless/02921_file_engine_size_virtual_column.reference new file mode 100644 index 00000000000..2f319dfb812 --- /dev/null +++ b/tests/queries/0_stateless/02921_file_engine_size_virtual_column.reference @@ -0,0 +1,12 @@ +2 +3 +4 +2 +3 +4 +2 +3 +4 +2 +3 +4 diff --git a/tests/queries/0_stateless/02921_file_engine_size_virtual_column.sh b/tests/queries/0_stateless/02921_file_engine_size_virtual_column.sh new file mode 100755 index 00000000000..5dd58ec0d7f --- /dev/null +++ b/tests/queries/0_stateless/02921_file_engine_size_virtual_column.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +echo "1" > $CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv +echo "12" > $CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv +echo "123" > $CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv + +$CLICKHOUSE_LOCAL -q "select _size from file('$CLICKHOUSE_TEST_UNIQUE_NAME.data{1,2,3}.tsv') order by _size" +# Run this query twice to check correct behaviour when cache is used +$CLICKHOUSE_LOCAL -q "select _size from file('$CLICKHOUSE_TEST_UNIQUE_NAME.data{1,2,3}.tsv') order by _size" + +# Test the same fils in archive +tar -cf $CLICKHOUSE_TEST_UNIQUE_NAME.archive.tar $CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv $CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv $CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv + +$CLICKHOUSE_LOCAL -q "select _size from file('$CLICKHOUSE_TEST_UNIQUE_NAME.archive.tar :: $CLICKHOUSE_TEST_UNIQUE_NAME.data{1,2,3}.tsv') order by _size" +$CLICKHOUSE_LOCAL -q "select _size from file('$CLICKHOUSE_TEST_UNIQUE_NAME.archive.tar :: $CLICKHOUSE_TEST_UNIQUE_NAME.data{1,2,3}.tsv') order by _size" + +rm $CLICKHOUSE_TEST_UNIQUE_NAME.* + diff --git a/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.reference b/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.reference new file mode 100644 index 00000000000..369837adcbb --- /dev/null +++ b/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.reference @@ -0,0 +1,12 @@ +a.tsv 24 +b.tsv 33 +c.tsv 33 +a.tsv 24 +b.tsv 33 +c.tsv 33 +a.tsv 24 +b.tsv 33 +c.tsv 33 +a.tsv 24 +b.tsv 33 +c.tsv 33 diff --git a/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh b/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh new file mode 100755 index 00000000000..57f38719f8b --- /dev/null +++ b/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_LOCAL -q "select _file, _size from url('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" +$CLICKHOUSE_LOCAL -q "select _file, _size from url('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" + +$CLICKHOUSE_LOCAL -q "select _file, _size from s3('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" +$CLICKHOUSE_LOCAL -q "select _file, _size from s3('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" + diff --git a/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.reference b/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.reference new file mode 100644 index 00000000000..bc42121fb39 --- /dev/null +++ b/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.reference @@ -0,0 +1,6 @@ +2 +3 +4 +2 +3 +4 diff --git a/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh b/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh new file mode 100755 index 00000000000..06d7b7717c1 --- /dev/null +++ b/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, use-hdfs + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv') select 1 settings hdfs_truncate_on_insert=1;" +$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv') select 11 settings hdfs_truncate_on_insert=1;" +$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv') select 111 settings hdfs_truncate_on_insert=1;" + + +$CLICKHOUSE_LOCAL -q "select _size from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') order by _size" +$CLICKHOUSE_LOCAL -q "select _size from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') order by _size" + From 193f8a5d2365d86d0e38dd8fbfdac59b8a38e986 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 22 Nov 2023 18:12:50 +0000 Subject: [PATCH 2/7] Add test for azure --- .../test_storage_azure_blob_storage/test.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index e0365f70e7f..d8f29793fd2 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -1156,3 +1156,36 @@ def test_filtering_by_file_or_path(cluster): ) assert int(result) == 1 + + +def test_size_virtual_column(cluster): + node = cluster.instances["node"] + storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"] + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_size_virtual_column1.tsv', 'devstoreaccount1', " + f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1", + ) + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_size_virtual_column2.tsv', 'devstoreaccount1', " + f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 11", + ) + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_size_virtual_column3.tsv', 'devstoreaccount1', " + f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 111", + ) + + result = node.query( + f"select _file, _size from azureBlobStorage('{storage_account_url}', 'cont', 'test_size_virtual_column*.tsv', 'devstoreaccount1', " + f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') " + f"order by _file" + ) + + assert ( + result + == "test_size_virtual_column1.tsv\t2\ntest_size_virtual_column2.tsv\t3\ntest_size_virtual_column3.tsv\t4\n" + ) From 6b64efb55a38c1e65052b6babaddc67d743d26bd Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 22 Nov 2023 18:21:30 +0000 Subject: [PATCH 3/7] Add docs --- .../table-engines/integrations/azureBlobStorage.md | 6 ++++++ docs/en/engines/table-engines/integrations/hdfs.md | 5 +++-- docs/en/engines/table-engines/integrations/s3.md | 5 +++-- docs/en/engines/table-engines/special/file.md | 8 +++++++- docs/en/engines/table-engines/special/url.md | 6 ++++++ .../en/sql-reference/table-functions/azureBlobStorage.md | 6 ++++++ docs/en/sql-reference/table-functions/file.md | 9 +++++---- docs/en/sql-reference/table-functions/hdfs.md | 5 +++-- docs/en/sql-reference/table-functions/s3.md | 6 ++++++ docs/en/sql-reference/table-functions/url.md | 5 +++-- 10 files changed, 48 insertions(+), 13 deletions(-) diff --git a/docs/en/engines/table-engines/integrations/azureBlobStorage.md b/docs/en/engines/table-engines/integrations/azureBlobStorage.md index 3df08ee2ffb..c6525121667 100644 --- a/docs/en/engines/table-engines/integrations/azureBlobStorage.md +++ b/docs/en/engines/table-engines/integrations/azureBlobStorage.md @@ -47,6 +47,12 @@ SELECT * FROM test_table; └──────┴───────┘ ``` +## Virtual columns {#virtual-columns} + +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. + ## See also [Azure Blob Storage Table Function](/docs/en/sql-reference/table-functions/azureBlobStorage) diff --git a/docs/en/engines/table-engines/integrations/hdfs.md b/docs/en/engines/table-engines/integrations/hdfs.md index c677123a8d0..19221c256f9 100644 --- a/docs/en/engines/table-engines/integrations/hdfs.md +++ b/docs/en/engines/table-engines/integrations/hdfs.md @@ -230,8 +230,9 @@ libhdfs3 support HDFS namenode HA. ## Virtual Columns {#virtual-columns} -- `_path` — Path to the file. -- `_file` — Name of the file. +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. ## Storage Settings {#storage-settings} diff --git a/docs/en/engines/table-engines/integrations/s3.md b/docs/en/engines/table-engines/integrations/s3.md index 2967a15494c..3144bdd32fa 100644 --- a/docs/en/engines/table-engines/integrations/s3.md +++ b/docs/en/engines/table-engines/integrations/s3.md @@ -142,8 +142,9 @@ Code: 48. DB::Exception: Received from localhost:9000. DB::Exception: Reading fr ## Virtual columns {#virtual-columns} -- `_path` — Path to the file. -- `_file` — Name of the file. +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns). diff --git a/docs/en/engines/table-engines/special/file.md b/docs/en/engines/table-engines/special/file.md index 27945b30c03..6e3897398a5 100644 --- a/docs/en/engines/table-engines/special/file.md +++ b/docs/en/engines/table-engines/special/file.md @@ -87,12 +87,18 @@ $ echo -e "1,2\n3,4" | clickhouse-local -q "CREATE TABLE table (a Int64, b Int64 - Indices - Replication -## PARTITION BY +## PARTITION BY {#partition-by} `PARTITION BY` — Optional. It is possible to create separate files by partitioning the data on a partition key. In most cases, you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression). For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format. +## Virtual Columns {#virtual-columns} + +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. + ## Settings {#settings} - [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default. diff --git a/docs/en/engines/table-engines/special/url.md b/docs/en/engines/table-engines/special/url.md index 5a5e1564180..f6183a779ae 100644 --- a/docs/en/engines/table-engines/special/url.md +++ b/docs/en/engines/table-engines/special/url.md @@ -103,6 +103,12 @@ SELECT * FROM url_engine_table For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format. +## Virtual Columns {#virtual-columns} + +- `_path` — Path to the `URL`. Type: `LowCardinalty(String)`. +- `_file` — Resource name of the `URL`. Type: `LowCardinalty(String)`. +- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. + ## Storage Settings {#storage-settings} - [engine_url_skip_empty_files](/docs/en/operations/settings/settings.md#engine_url_skip_empty_files) - allows to skip empty files while reading. Disabled by default. diff --git a/docs/en/sql-reference/table-functions/azureBlobStorage.md b/docs/en/sql-reference/table-functions/azureBlobStorage.md index 59c92e1327e..1510489ce83 100644 --- a/docs/en/sql-reference/table-functions/azureBlobStorage.md +++ b/docs/en/sql-reference/table-functions/azureBlobStorage.md @@ -67,6 +67,12 @@ SELECT count(*) FROM azureBlobStorage('DefaultEndpointsProtocol=https;AccountNam └─────────┘ ``` +## Virtual Columns {#virtual-columns} + +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`. + **See Also** - [AzureBlobStorage Table Engine](/docs/en/engines/table-engines/integrations/azureBlobStorage.md) diff --git a/docs/en/sql-reference/table-functions/file.md b/docs/en/sql-reference/table-functions/file.md index a871bdaafa9..ad1feb87c60 100644 --- a/docs/en/sql-reference/table-functions/file.md +++ b/docs/en/sql-reference/table-functions/file.md @@ -191,12 +191,13 @@ Query the total number of rows from all files `file002` inside any folder in dir SELECT count(*) FROM file('big_dir/**/file002', 'CSV', 'name String, value UInt32'); ``` -## Virtual Columns +## Virtual Columns {#virtual-columns} -- `_path` — Path to the file. -- `_file` — Name of the file. +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`. -## Settings +## Settings {#settings} - [engine_file_empty_if_not_exists](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - allows to select empty data from a file that doesn't exist. Disabled by default. - [engine_file_truncate_on_insert](/docs/en/operations/settings/settings.md#engine-file-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default. diff --git a/docs/en/sql-reference/table-functions/hdfs.md b/docs/en/sql-reference/table-functions/hdfs.md index 678470e9150..31780e30e8e 100644 --- a/docs/en/sql-reference/table-functions/hdfs.md +++ b/docs/en/sql-reference/table-functions/hdfs.md @@ -94,8 +94,9 @@ FROM hdfs('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name Strin ## Virtual Columns -- `_path` — Path to the file. -- `_file` — Name of the file. +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. ## Storage Settings {#storage-settings} diff --git a/docs/en/sql-reference/table-functions/s3.md b/docs/en/sql-reference/table-functions/s3.md index 8649295e815..e0c6c08b9a4 100644 --- a/docs/en/sql-reference/table-functions/s3.md +++ b/docs/en/sql-reference/table-functions/s3.md @@ -224,6 +224,12 @@ FROM s3( LIMIT 5; ``` +## Virtual Columns {#virtual-columns} + +- `_path` — Path to the file. Type: `LowCardinalty(String)`. +- `_file` — Name of the file. Type: `LowCardinalty(String)`. +- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`. + ## Storage Settings {#storage-settings} - [s3_truncate_on_insert](/docs/en/operations/settings/settings.md#s3-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default. diff --git a/docs/en/sql-reference/table-functions/url.md b/docs/en/sql-reference/table-functions/url.md index 859de86f019..4dc6e435b50 100644 --- a/docs/en/sql-reference/table-functions/url.md +++ b/docs/en/sql-reference/table-functions/url.md @@ -50,8 +50,9 @@ Character `|` inside patterns is used to specify failover addresses. They are it ## Virtual Columns -- `_path` — Path to the `URL`. -- `_file` — Resource name of the `URL`. +- `_path` — Path to the `URL`. Type: `LowCardinalty(String)`. +- `_file` — Resource name of the `URL`. Type: `LowCardinalty(String)`. +- `_size` — Size of the resource in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`. ## Storage Settings {#storage-settings} From 4a86f4a7b91df52817b9eb5df5052fc689fdd5b2 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 22 Nov 2023 18:24:34 +0000 Subject: [PATCH 4/7] Fix style changes --- src/Storages/StorageS3.cpp | 691 ++++++++++++++++++------------------- 1 file changed, 329 insertions(+), 362 deletions(-) diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index a00f28a733f..93f288fd874 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1,90 +1,90 @@ +#include "config.h" #include #include "Parsers/ASTCreateQuery.h" -#include "config.h" #if USE_AWS_S3 -# include +#include -# include -# include -# include -# include +#include +#include +#include +#include -# include -# include +#include +#include -# include -# include +#include +#include -# include -# include -# include -# include -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include +#include +#include +#include +#include -# include -# include -# include +#include +#include +#include -# include -# include +#include +#include -# include -# include +#include +#include -# include -# include -# include -# include -# include +#include +#include +#include +#include +#include -# include +#include -# include +#include -# include +#include -# include -# include -# include -# include +#include +#include +#include +#include -# include -# include -# include -# include +#include +#include +#include +#include -# include +#include -# ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant" -# endif -# include -# ifdef __clang__ -# pragma clang diagnostic pop -# endif +#ifdef __clang__ +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant" +#endif +#include +#ifdef __clang__ +# pragma clang diagnostic pop +#endif namespace fs = std::filesystem; namespace CurrentMetrics { -extern const Metric StorageS3Threads; -extern const Metric StorageS3ThreadsActive; -extern const Metric StorageS3ThreadsScheduled; + extern const Metric StorageS3Threads; + extern const Metric StorageS3ThreadsActive; + extern const Metric StorageS3ThreadsScheduled; } namespace ProfileEvents { -extern const Event S3DeleteObjects; -extern const Event S3ListObjects; -extern const Event EngineFileLikeReadFiles; + extern const Event S3DeleteObjects; + extern const Event S3ListObjects; + extern const Event EngineFileLikeReadFiles; } namespace DB @@ -93,36 +93,37 @@ namespace DB static const std::unordered_set required_configuration_keys = { "url", }; -static const std::unordered_set optional_configuration_keys - = {"format", - "compression", - "compression_method", - "structure", - "access_key_id", - "secret_access_key", - "filename", - "use_environment_credentials", - "max_single_read_retries", - "min_upload_part_size", - "upload_part_size_multiply_factor", - "upload_part_size_multiply_parts_count_threshold", - "max_single_part_upload_size", - "max_connections", - "expiration_window_seconds", - "no_sign_request"}; +static const std::unordered_set optional_configuration_keys = { + "format", + "compression", + "compression_method", + "structure", + "access_key_id", + "secret_access_key", + "filename", + "use_environment_credentials", + "max_single_read_retries", + "min_upload_part_size", + "upload_part_size_multiply_factor", + "upload_part_size_multiply_parts_count_threshold", + "max_single_part_upload_size", + "max_connections", + "expiration_window_seconds", + "no_sign_request" +}; namespace ErrorCodes { -extern const int CANNOT_PARSE_TEXT; -extern const int BAD_ARGUMENTS; -extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; -extern const int S3_ERROR; -extern const int UNEXPECTED_EXPRESSION; -extern const int DATABASE_ACCESS_DENIED; -extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; -extern const int NOT_IMPLEMENTED; -extern const int CANNOT_COMPILE_REGEXP; -extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_PARSE_TEXT; + extern const int BAD_ARGUMENTS; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int S3_ERROR; + extern const int UNEXPECTED_EXPRESSION; + extern const int DATABASE_ACCESS_DENIED; + extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; + extern const int NOT_IMPLEMENTED; + extern const int CANNOT_COMPILE_REGEXP; + extern const int FILE_DOESNT_EXIST; } class IOutputFormat; @@ -147,8 +148,7 @@ public: , virtual_columns(virtual_columns_) , read_keys(read_keys_) , request_settings(request_settings_) - , list_objects_pool( - CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) + , list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) , list_objects_scheduler(threadPoolCallbackRunner(list_objects_pool, "ListObjects")) , file_progress_callback(file_progress_callback_) { @@ -174,8 +174,8 @@ public: matcher = std::make_unique(makeRegexpPatternFromGlobs(globbed_uri.key)); if (!matcher->ok()) - throw Exception( - ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error()); + throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, + "Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error()); recursive = globbed_uri.key == "/**" ? true : false; fillInternalBufferAssumeLocked(); @@ -187,9 +187,15 @@ public: return nextAssumeLocked(); } - size_t objectsCount() { return buffer.size(); } + size_t objectsCount() + { + return buffer.size(); + } - ~Impl() { list_objects_pool.wait(); } + ~Impl() + { + list_objects_pool.wait(); + } private: using ListObjectsOutcome = Aws::S3::Model::ListObjectsV2Outcome; @@ -245,13 +251,9 @@ private: if (!outcome.IsSuccess()) { - throw S3Exception( - outcome.GetError().GetErrorType(), - "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}", - quoteString(request.GetBucket()), - quoteString(request.GetPrefix()), - backQuote(outcome.GetError().GetExceptionName()), - quoteString(outcome.GetError().GetMessage())); + throw S3Exception(outcome.GetError().GetErrorType(), "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}", + quoteString(request.GetBucket()), quoteString(request.GetPrefix()), + backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage())); } const auto & result_batch = outcome.GetResult().GetContents(); @@ -278,7 +280,8 @@ private: String key = row.GetKey(); if (recursive || re2::RE2::FullMatch(key, *matcher)) { - S3::ObjectInfo info = { + S3::ObjectInfo info = + { .size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000, }; @@ -295,8 +298,7 @@ private: if (!is_initialized) { - filter_ast = VirtualColumnUtils::createPathAndFileFilterAst( - query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext()); + filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext()); is_initialized = true; } @@ -313,8 +315,10 @@ private: buffer = std::move(temp_buffer); if (file_progress_callback) + { for (const auto & key_with_info : buffer) file_progress_callback(FileProgress(0, key_with_info->info->size)); + } /// Set iterator only after the whole batch is processed buffer_iter = buffer.begin(); @@ -325,19 +329,17 @@ private: std::future listObjectsAsync() { - return list_objects_scheduler( - [this] - { - ProfileEvents::increment(ProfileEvents::S3ListObjects); - auto outcome = client->ListObjectsV2(request); + return list_objects_scheduler([this] + { + ProfileEvents::increment(ProfileEvents::S3ListObjects); + auto outcome = client->ListObjectsV2(request); - /// Outcome failure will be handled on the caller side. - if (outcome.IsSuccess()) - request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + /// Outcome failure will be handled on the caller side. + if (outcome.IsSuccess()) + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - return outcome; - }, - Priority{}); + return outcome; + }, Priority{}); } std::mutex mutex; @@ -374,8 +376,7 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( KeysWithInfo * read_keys_, const S3Settings::RequestSettings & request_settings_, std::function file_progress_callback_) - : pimpl(std::make_shared( - client_, globbed_uri_, query, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_)) + : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_)) { } @@ -428,8 +429,10 @@ public: } if (read_keys_) + { for (const auto & key : keys) read_keys_->push_back(std::make_shared(key)); + } } KeyWithInfoPtr next() @@ -448,7 +451,10 @@ public: return std::make_shared(key, info); } - size_t objectsCount() { return keys.size(); } + size_t objectsCount() + { + return keys.size(); + } private: Strings keys; @@ -474,7 +480,8 @@ StorageS3Source::KeysIterator::KeysIterator( KeysWithInfo * read_keys, std::function file_progress_callback_) : pimpl(std::make_shared( - client_, version_id_, keys_, bucket_, request_settings_, query, virtual_columns_, context, read_keys, file_progress_callback_)) + client_, version_id_, keys_, bucket_, request_settings_, + query, virtual_columns_, context, read_keys, file_progress_callback_)) { } @@ -488,13 +495,12 @@ size_t StorageS3Source::KeysIterator::estimatedKeysCount() return pimpl->objectsCount(); } -StorageS3Source::ReadTaskIterator::ReadTaskIterator(const DB::ReadTaskCallback & callback_, size_t max_threads_count) : callback(callback_) +StorageS3Source::ReadTaskIterator::ReadTaskIterator( + const DB::ReadTaskCallback & callback_, + size_t max_threads_count) + : callback(callback_) { - ThreadPool pool( - CurrentMetrics::StorageS3Threads, - CurrentMetrics::StorageS3ThreadsActive, - CurrentMetrics::StorageS3ThreadsScheduled, - max_threads_count); + ThreadPool pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, max_threads_count); auto pool_scheduler = threadPoolCallbackRunner(pool, "S3ReadTaskItr"); std::vector> keys; @@ -559,8 +565,7 @@ StorageS3Source::StorageS3Source( , file_iterator(file_iterator_) , max_parsing_threads(max_parsing_threads_) , need_only_count(need_only_count_) - , create_reader_pool( - CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) + , create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) , create_reader_scheduler(threadPoolCallbackRunner(create_reader_pool, "CreateS3Reader")) { } @@ -587,14 +592,13 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() if (!key_with_info->info) key_with_info->info = S3::getObjectInfo(*client, bucket, key_with_info->key, version_id, request_settings); - } while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0); + } + while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0); QueryPipelineBuilder builder; std::shared_ptr source; std::unique_ptr read_buf; - std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files - ? tryGetNumRowsFromCache(*key_with_info) - : std::nullopt; + std::optional num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(*key_with_info) : std::nullopt; if (num_rows_from_cache) { /// We should not return single chunk with all number of rows, @@ -643,7 +647,10 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() /// Add ExtractColumnsTransform to extract requested columns/subcolumns /// from chunk read by IInputFormat. - builder.addSimpleTransform([&](const Block & header) { return std::make_shared(header, requested_columns); }); + builder.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, requested_columns); + }); auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); auto current_reader = std::make_unique(*pipeline); @@ -675,25 +682,18 @@ std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & k } return std::make_unique( - client, - bucket, - key, - version_id, - request_settings, - read_settings, - /*use_external_buffer*/ false, - /*offset_*/ 0, - /*read_until_position_*/ 0, - /*restricted_seek_*/ false, - object_size); + client, bucket, key, version_id, request_settings, read_settings, + /*use_external_buffer*/ false, /*offset_*/ 0, /*read_until_position_*/ 0, + /*restricted_seek_*/ false, object_size); } -std::unique_ptr -StorageS3Source::createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size) +std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( + const String & key, const ReadSettings & read_settings, size_t object_size) { auto context = getContext(); auto read_buffer_creator = - [this, read_settings, object_size](const std::string & path, size_t read_until_position) -> std::unique_ptr + [this, read_settings, object_size] + (const std::string & path, size_t read_until_position) -> std::unique_ptr { return std::make_unique( client, @@ -702,10 +702,10 @@ StorageS3Source::createAsyncS3ReadBuffer(const String & key, const ReadSettings version_id, request_settings, read_settings, - /* use_external_buffer */ true, - /* offset */ 0, + /* use_external_buffer */true, + /* offset */0, read_until_position, - /* restricted_seek */ true, + /* restricted_seek */true, object_size); }; @@ -713,8 +713,7 @@ StorageS3Source::createAsyncS3ReadBuffer(const String & key, const ReadSettings std::move(read_buffer_creator), StoredObjects{StoredObject{key, object_size}}, read_settings, - /* cache_log */ nullptr, - /* use_external_buffer */ true); + /* cache_log */nullptr, /* use_external_buffer */true); auto modified_settings{read_settings}; /// FIXME: Changing this setting to default value breaks something around parquet reading @@ -722,7 +721,8 @@ StorageS3Source::createAsyncS3ReadBuffer(const String & key, const ReadSettings auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); auto async_reader = std::make_unique( - std::move(s3_impl), pool_reader, modified_settings, context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); + std::move(s3_impl), pool_reader, modified_settings, + context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); async_reader->setReadUntilEnd(); if (read_settings.remote_fs_prefetch) @@ -798,7 +798,10 @@ std::optional StorageS3Source::tryGetNumRowsFromCache(const KeyWithInfo { String source = fs::path(url_host_and_port) / bucket / key_with_info.key; auto cache_key = getKeyForSchemaCache(source, format, format_settings, getContext()); - auto get_last_mod_time = [&]() -> std::optional { return key_with_info.info->last_modification_time; }; + auto get_last_mod_time = [&]() -> std::optional + { + return key_with_info.info->last_modification_time; + }; return StorageS3::getSchemaCache(getContext()).tryGetNumRows(cache_key, get_last_mod_time); } @@ -815,7 +818,9 @@ public: const StorageS3::Configuration & configuration_, const String & bucket, const String & key) - : SinkToStorage(sample_block_), sample_block(sample_block_), format_settings(format_settings_) + : SinkToStorage(sample_block_) + , sample_block(sample_block_) + , format_settings(format_settings_) { write_buf = wrapWriteBufferWithCompressionMethod( std::make_unique( @@ -829,7 +834,8 @@ public: context->getWriteSettings()), compression_method, 3); - writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings); + writer + = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings); } String getName() const override { return "StorageS3Sink"; } @@ -938,7 +944,15 @@ public: validateKey(partition_key); return std::make_shared( - format, sample_block, context, format_settings, compression_method, configuration, partition_bucket, partition_key); + format, + sample_block, + context, + format_settings, + compression_method, + configuration, + partition_bucket, + partition_key + ); } private: @@ -1013,8 +1027,7 @@ StorageS3::StorageS3( { /// We don't allow special columns in S3 storage. if (!columns_.hasOnlyOrdinary()) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL"); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL"); storage_metadata.setColumns(columns_); } @@ -1036,35 +1049,21 @@ std::shared_ptr StorageS3::createFileIterator( { if (distributed_processing) { - return std::make_shared( - local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); + return std::make_shared(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); } else if (configuration.withGlobs()) { /// Iterate through disclosed globs and make a source for each file return std::make_shared( - *configuration.client, - configuration.url, - query, - virtual_columns, - local_context, - read_keys, - configuration.request_settings, - file_progress_callback); + *configuration.client, configuration.url, query, virtual_columns, + local_context, read_keys, configuration.request_settings, file_progress_callback); } else { return std::make_shared( - *configuration.client, - configuration.url.version_id, - configuration.keys, - configuration.url.bucket, - configuration.request_settings, - query, - virtual_columns, - local_context, - read_keys, - file_progress_callback); + *configuration.client, configuration.url.version_id, configuration.keys, + configuration.url.bucket, configuration.request_settings, query, + virtual_columns, local_context, read_keys, file_progress_callback); } } @@ -1100,13 +1099,7 @@ Pipe StorageS3::read( Pipes pipes; std::shared_ptr iterator_wrapper = createFileIterator( - query_configuration, - distributed_processing, - local_context, - query_info.query, - virtual_columns, - nullptr, - local_context->getFileProgressCallback()); + query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback()); size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount(); if (estimated_keys_count > 1) @@ -1115,8 +1108,7 @@ Pipe StorageS3::read( /// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case. num_streams = 1; - auto read_from_format_info - = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; @@ -1149,8 +1141,7 @@ Pipe StorageS3::read( return Pipe::unitePipes(std::move(pipes)); } -SinkToStoragePtr -StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) +SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { auto query_configuration = updateConfigurationAndGetCopy(local_context); @@ -1177,20 +1168,12 @@ StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snaps else { if (query_configuration.withGlobs()) - throw Exception( - ErrorCodes::DATABASE_ACCESS_DENIED, - "S3 key '{}' contains globs, so the table is in readonly mode", - query_configuration.url.key); + throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, + "S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key); bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert; - if (!truncate_in_insert - && S3::objectExists( - *query_configuration.client, - query_configuration.url.bucket, - query_configuration.keys.back(), - query_configuration.url.version_id, - query_configuration.request_settings)) + if (!truncate_in_insert && S3::objectExists(*query_configuration.client, query_configuration.url.bucket, query_configuration.keys.back(), query_configuration.url.version_id, query_configuration.request_settings)) { if (local_context->getSettingsRef().s3_create_new_file_on_insert) { @@ -1200,15 +1183,10 @@ StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snaps String new_key; do { - new_key - = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); + new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); ++index; - } while (S3::objectExists( - *query_configuration.client, - query_configuration.url.bucket, - new_key, - query_configuration.url.version_id, - query_configuration.request_settings)); + } + while (S3::objectExists(*query_configuration.client, query_configuration.url.bucket, new_key, query_configuration.url.version_id, query_configuration.request_settings)); query_configuration.keys.push_back(new_key); configuration.keys.push_back(new_key); @@ -1220,8 +1198,7 @@ StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snaps "Object in bucket {} with key {} already exists. " "If you want to overwrite it, enable setting s3_truncate_on_insert, if you " "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", - query_configuration.url.bucket, - query_configuration.keys.back()); + query_configuration.url.bucket, query_configuration.keys.back()); } } @@ -1368,12 +1345,10 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur configuration.auth_settings.secret_access_key = collection.getOrDefault("secret_access_key", ""); configuration.auth_settings.use_environment_credentials = collection.getOrDefault("use_environment_credentials", 1); configuration.auth_settings.no_sign_request = collection.getOrDefault("no_sign_request", false); - configuration.auth_settings.expiration_window_seconds - = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); + configuration.auth_settings.expiration_window_seconds = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); configuration.format = collection.getOrDefault("format", configuration.format); - configuration.compression_method - = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); + configuration.compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); configuration.structure = collection.getOrDefault("structure", "auto"); configuration.request_settings = S3Settings::RequestSettings(collection); @@ -1403,10 +1378,9 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context /// with optional headers() function if (engine_args.empty() || engine_args.size() > 5) - throw Exception( - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, - "Storage S3 requires 1 to 5 arguments: " - "url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]"); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage S3 requires 1 to 5 arguments: " + "url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]"); auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context); if (header_it != engine_args.end()) @@ -1416,8 +1390,11 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); /// Size -> argument indexes - static std::unordered_map> size_to_engine_args{ - {1, {{}}}, {5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression_method", 4}}}}; + static std::unordered_map> size_to_engine_args + { + {1, {{}}}, + {5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression_method", 4}}} + }; std::unordered_map engine_args_to_idx; bool no_sign_request = false; @@ -1479,16 +1456,13 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context configuration.format = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["format"]], "format"); if (engine_args_to_idx.contains("compression_method")) - configuration.compression_method - = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); + configuration.compression_method = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); if (engine_args_to_idx.contains("access_key_id")) - configuration.auth_settings.access_key_id - = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); + configuration.auth_settings.access_key_id = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); if (engine_args_to_idx.contains("secret_access_key")) - configuration.auth_settings.secret_access_key - = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); + configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); configuration.auth_settings.no_sign_request = no_sign_request; } @@ -1504,109 +1478,105 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context } ColumnsDescription StorageS3::getTableStructureFromData( - const StorageS3::Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) + const StorageS3::Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx) { return getTableStructureFromDataImpl(configuration, format_settings, ctx); } namespace { -class ReadBufferIterator : public IReadBufferIterator, WithContext -{ -public: - ReadBufferIterator( - std::shared_ptr file_iterator_, - const StorageS3Source::KeysWithInfo & read_keys_, - const StorageS3::Configuration & configuration_, - const std::optional & format_settings_, - const ContextPtr & context_) - : WithContext(context_) - , file_iterator(file_iterator_) - , read_keys(read_keys_) - , configuration(configuration_) - , format_settings(format_settings_) - , prev_read_keys_size(read_keys_.size()) + class ReadBufferIterator : public IReadBufferIterator, WithContext { - } - - std::unique_ptr next() override - { - while (true) + public: + ReadBufferIterator( + std::shared_ptr file_iterator_, + const StorageS3Source::KeysWithInfo & read_keys_, + const StorageS3::Configuration & configuration_, + const std::optional & format_settings_, + const ContextPtr & context_) + : WithContext(context_) + , file_iterator(file_iterator_) + , read_keys(read_keys_) + , configuration(configuration_) + , format_settings(format_settings_) + , prev_read_keys_size(read_keys_.size()) { - current_key_with_info = (*file_iterator)(); + } - if (!current_key_with_info || current_key_with_info->key.empty()) + std::unique_ptr next() override + { + while (true) { - if (first) - throw Exception( - ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, - "Cannot extract table structure from {} format file, because there are no files with provided path " - "in S3 or all files are empty. You must specify table structure manually", - configuration.format); + current_key_with_info = (*file_iterator)(); - return nullptr; - } + if (!current_key_with_info || current_key_with_info->key.empty()) + { + if (first) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Cannot extract table structure from {} format file, because there are no files with provided path " + "in S3 or all files are empty. You must specify table structure manually", + configuration.format); - /// S3 file iterator could get new keys after new iteration, check them in schema cache. - if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) - { - columns_from_cache = StorageS3::tryGetColumnsFromCache( - read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, getContext()); - prev_read_keys_size = read_keys.size(); - if (columns_from_cache) return nullptr; - } + } - if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) - continue; + /// S3 file iterator could get new keys after new iteration, check them in schema cache. + if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) + { + columns_from_cache = StorageS3::tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, getContext()); + prev_read_keys_size = read_keys.size(); + if (columns_from_cache) + return nullptr; + } - int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); - auto impl = std::make_unique( - configuration.client, - configuration.url.bucket, - current_key_with_info->key, - configuration.url.version_id, - configuration.request_settings, - getContext()->getReadSettings()); - if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) - { - first = false; - return wrapReadBufferWithCompressionMethod( - std::move(impl), - chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), - zstd_window_log_max); + if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) + continue; + + int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); + auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); + if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) + { + first = false; + return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max); + } } } - } - std::optional getCachedColumns() override { return columns_from_cache; } + std::optional getCachedColumns() override + { + return columns_from_cache; + } - void setNumRowsToLastFile(size_t num_rows) override - { - if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) - return; + void setNumRowsToLastFile(size_t num_rows) override + { + if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) + return; - String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) - / configuration.url.bucket / current_key_with_info->key; - auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext()); - StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); - } + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key; + auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext()); + StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); + } -private: - std::shared_ptr file_iterator; - const StorageS3Source::KeysWithInfo & read_keys; - const StorageS3::Configuration & configuration; - const std::optional & format_settings; - std::optional columns_from_cache; - StorageS3Source::KeyWithInfoPtr current_key_with_info; - size_t prev_read_keys_size; - bool first = true; -}; + private: + std::shared_ptr file_iterator; + const StorageS3Source::KeysWithInfo & read_keys; + const StorageS3::Configuration & configuration; + const std::optional & format_settings; + std::optional columns_from_cache; + StorageS3Source::KeyWithInfoPtr current_key_with_info; + size_t prev_read_keys_size; + bool first = true; + }; } ColumnsDescription StorageS3::getTableStructureFromDataImpl( - const Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) + const Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx) { KeysWithInfo read_keys; @@ -1636,60 +1606,60 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( void registerStorageS3Impl(const String & name, StorageFactory & factory) { - factory.registerStorage( - name, - [](const StorageFactory::Arguments & args) + factory.registerStorage(name, [](const StorageFactory::Arguments & args) + { + auto & engine_args = args.engine_args; + if (engine_args.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); + + auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext()); + // Use format settings from global server context + settings from + // the SETTINGS clause of the create query. Settings from current + // session and user are ignored. + std::optional format_settings; + if (args.storage_def->settings) { - auto & engine_args = args.engine_args; - if (engine_args.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); + FormatFactorySettings user_format_settings; - auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext()); - // Use format settings from global server context + settings from - // the SETTINGS clause of the create query. Settings from current - // session and user are ignored. - std::optional format_settings; - if (args.storage_def->settings) + // Apply changed settings from global context, but ignore the + // unknown ones, because we only have the format settings here. + const auto & changes = args.getContext()->getSettingsRef().changes(); + for (const auto & change : changes) { - FormatFactorySettings user_format_settings; - - // Apply changed settings from global context, but ignore the - // unknown ones, because we only have the format settings here. - const auto & changes = args.getContext()->getSettingsRef().changes(); - for (const auto & change : changes) - if (user_format_settings.has(change.name)) - user_format_settings.set(change.name, change.value); - - // Apply changes from SETTINGS clause, with validation. - user_format_settings.applyChanges(args.storage_def->settings->changes); - format_settings = getFormatSettings(args.getContext(), user_format_settings); - } - else - { - format_settings = getFormatSettings(args.getContext()); + if (user_format_settings.has(change.name)) + user_format_settings.set(change.name, change.value); } - ASTPtr partition_by; - if (args.storage_def->partition_by) - partition_by = args.storage_def->partition_by->clone(); - - return std::make_shared( - std::move(configuration), - args.getContext(), - args.table_id, - args.columns, - args.constraints, - args.comment, - format_settings, - /* distributed_processing_ */ false, - partition_by); - }, + // Apply changes from SETTINGS clause, with validation. + user_format_settings.applyChanges(args.storage_def->settings->changes); + format_settings = getFormatSettings(args.getContext(), user_format_settings); + } + else { - .supports_settings = true, - .supports_sort_order = true, // for partition by - .supports_schema_inference = true, - .source_access_type = AccessType::S3, - }); + format_settings = getFormatSettings(args.getContext()); + } + + ASTPtr partition_by; + if (args.storage_def->partition_by) + partition_by = args.storage_def->partition_by->clone(); + + return std::make_shared( + std::move(configuration), + args.getContext(), + args.table_id, + args.columns, + args.constraints, + args.comment, + format_settings, + /* distributed_processing_ */false, + partition_by); + }, + { + .supports_settings = true, + .supports_sort_order = true, // for partition by + .supports_schema_inference = true, + .source_access_type = AccessType::S3, + }); } void registerStorageS3(StorageFactory & factory) @@ -1719,8 +1689,7 @@ bool StorageS3::supportsPartitionBy() const SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx) { - static SchemaCache schema_cache( - ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } @@ -1747,15 +1716,14 @@ std::optional StorageS3::tryGetColumnsFromCache( /// but schema cache will handle this case and won't return columns from cache /// because we can't say that it's valid without last modification time. last_modification_time = S3::getObjectInfo( - *configuration.client, - configuration.url.bucket, - (*it)->key, - configuration.url.version_id, - configuration.request_settings, - /*with_metadata=*/false, - /*for_disk_s3=*/false, - /*throw_on_error= */ false) - .last_modification_time; + *configuration.client, + configuration.url.bucket, + (*it)->key, + configuration.url.version_id, + configuration.request_settings, + /*with_metadata=*/ false, + /*for_disk_s3=*/ false, + /*throw_on_error= */ false).last_modification_time; } return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt; @@ -1780,11 +1748,10 @@ void StorageS3::addColumnsToCache( const std::optional & format_settings, const ContextPtr & ctx) { - auto host_and_bucket - = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; + auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; Strings sources; sources.reserve(keys.size()); - std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem) { return host_and_bucket / elem->key; }); + std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; }); auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); auto & schema_cache = getSchemaCache(ctx); schema_cache.addManyColumns(cache_keys, columns); From 031f03010266db9969cd9d613edec7efe366d177 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 22 Nov 2023 18:27:32 +0000 Subject: [PATCH 5/7] Remove unused method --- src/Storages/StorageAzureBlob.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index a80abce99f1..b97dee0caed 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -331,7 +331,6 @@ private: const String & getRelativePath() const { return relative_path_with_metadata.relative_path; } const RelativePathWithMetadata & getRelativePathWithMetadata() const { return relative_path_with_metadata; } const IInputFormat * getInputFormat() const { return dynamic_cast(source.get()); } - ReadBuffer & getReadBuffer() const { return *read_buf; } private: RelativePathWithMetadata relative_path_with_metadata; From 70f45c11bdaa77777b085021dd0f687ea86e10a3 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 22 Nov 2023 19:56:42 +0000 Subject: [PATCH 6/7] Fix --- src/Storages/VirtualColumnUtils.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index 51e81b00838..86ba7ee3a34 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -396,7 +396,10 @@ ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList Block block; for (const auto & column : virtual_columns) - block.insert({column.type->createColumn(), column.type, column.name}); + { + if (column.name == "_file" || column.name == "_path") + block.insert({column.type->createColumn(), column.type, column.name}); + } /// Create a block with one row to construct filter /// Append "idx" column as the filter result block.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); @@ -410,7 +413,10 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const { Block block; for (const auto & column : virtual_columns) - block.insert({column.type->createColumn(), column.type, column.name}); + { + if (column.name == "_file" || column.name == "_path") + block.insert({column.type->createColumn(), column.type, column.name}); + } block.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); for (size_t i = 0; i != paths.size(); ++i) From e3afb4182bbe6df12ddfc2a7c63be543ec76b285 Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 23 Nov 2023 11:10:41 +0000 Subject: [PATCH 7/7] Fix tests --- .../test_storage_azure_blob_storage/test.py | 5 +++-- .../02922_url_s3_engine_size_virtual_column.sh | 8 ++++---- .../02923_hdfs_engine_size_virtual_column.sh | 10 +++++----- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index d8f29793fd2..96fff6b891f 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -1179,10 +1179,11 @@ def test_size_virtual_column(cluster): f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 111", ) - result = node.query( + result = azure_query( + node, f"select _file, _size from azureBlobStorage('{storage_account_url}', 'cont', 'test_size_virtual_column*.tsv', 'devstoreaccount1', " f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') " - f"order by _file" + f"order by _file", ) assert ( diff --git a/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh b/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh index 57f38719f8b..51de2117dca 100755 --- a/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh +++ b/tests/queries/0_stateless/02922_url_s3_engine_size_virtual_column.sh @@ -5,9 +5,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -$CLICKHOUSE_LOCAL -q "select _file, _size from url('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" -$CLICKHOUSE_LOCAL -q "select _file, _size from url('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" +$CLICKHOUSE_CLIENT -q "select _file, _size from url('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" +$CLICKHOUSE_CLIENT -q "select _file, _size from url('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" -$CLICKHOUSE_LOCAL -q "select _file, _size from s3('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" -$CLICKHOUSE_LOCAL -q "select _file, _size from s3('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" +$CLICKHOUSE_CLIENT -q "select _file, _size from s3('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" +$CLICKHOUSE_CLIENT -q "select _file, _size from s3('http://localhost:11111/test/{a,b,c}.tsv', 'One') order by _file" diff --git a/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh b/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh index 06d7b7717c1..dc01687772f 100755 --- a/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh +++ b/tests/queries/0_stateless/02923_hdfs_engine_size_virtual_column.sh @@ -5,11 +5,11 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv') select 1 settings hdfs_truncate_on_insert=1;" -$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv') select 11 settings hdfs_truncate_on_insert=1;" -$CLICKHOUSE_LOCAL -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv') select 111 settings hdfs_truncate_on_insert=1;" +$CLICKHOUSE_CLIENT -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data1.tsv') select 1 settings hdfs_truncate_on_insert=1;" +$CLICKHOUSE_CLIENT -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data2.tsv') select 11 settings hdfs_truncate_on_insert=1;" +$CLICKHOUSE_CLIENT -q "insert into table function hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data3.tsv') select 111 settings hdfs_truncate_on_insert=1;" -$CLICKHOUSE_LOCAL -q "select _size from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') order by _size" -$CLICKHOUSE_LOCAL -q "select _size from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') order by _size" +$CLICKHOUSE_CLIENT -q "select _size from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') order by _size" +$CLICKHOUSE_CLIENT -q "select _size from hdfs('hdfs://localhost:12222/$CLICKHOUSE_TEST_UNIQUE_NAME.data*.tsv', auto, 'x UInt64') order by _size"