From 9628874ac1d08a02100673744077d7479f4f87ee Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 9 Apr 2024 10:40:40 +0000 Subject: [PATCH] Better --- src/Storages/HDFS/StorageHDFS.cpp | 4 ++-- src/Storages/StorageAzureBlob.cpp | 12 ++++++++---- src/Storages/StorageFile.cpp | 8 ++++---- src/Storages/StorageS3.cpp | 8 +++++--- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 82403b8dacd..0a1e8e58ada 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -1092,7 +1092,7 @@ void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const Bui SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/) { - String current_uri = uris.back(); + String current_uri = uris.front(); bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos; const auto * insert_query = dynamic_cast(query.get()); @@ -1114,7 +1114,7 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP if (is_path_with_globs) throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back()); - if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, current_uri, uris.size())) + if (auto new_uri = checkFileExistsAndCreateNewKeyIfNeeded(context_, uris.front(), uris.size())) { uris.push_back(*new_uri); current_uri = *new_uri; diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index 083aec3b164..d5ce526d22d 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -868,8 +868,9 @@ void ReadFromAzureBlob::initializePipeline(QueryPipelineBuilder & pipeline, cons SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { + auto path = configuration.blobs_paths.front(); auto sample_block = metadata_snapshot->getSampleBlock(); - auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method); + auto chosen_compression_method = chooseCompressionMethod(path, configuration.compression_method); auto insert_query = std::dynamic_pointer_cast(query); auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; @@ -885,7 +886,7 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta format_settings, chosen_compression_method, object_storage.get(), - configuration.blobs_paths.back()); + path); } else { @@ -893,8 +894,11 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path); - if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), configuration.blobs_paths.back(), configuration.blobs_paths.size())) + if (auto new_path = checkFileExistsAndCreateNewKeyIfNeeded(local_context, object_storage.get(), path, configuration.blobs_paths.size())) + { configuration.blobs_paths.push_back(*new_path); + path = *new_path; + } return std::make_shared( configuration.format, @@ -903,7 +907,7 @@ SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMeta format_settings, chosen_compression_method, object_storage.get(), - configuration.blobs_paths.back()); + path); } } diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index f747bbf6b28..94456d50558 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1968,22 +1968,22 @@ SinkToStoragePtr StorageFile::write( "Table '{}' is in readonly mode because of globs in filepath", getStorageID().getNameForLogs()); - path = paths.back(); + path = paths.front(); fs::create_directories(fs::path(path).parent_path()); std::error_code error_code; if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs && !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings) - && fs::file_size(paths.back(), error_code) != 0 && !error_code) + && fs::file_size(path, error_code) != 0 && !error_code) { if (context->getSettingsRef().engine_file_allow_create_multiple_files) { - auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/')); + auto pos = path.find_first_of('.', path.find_last_of('/')); size_t index = paths.size(); String new_path; do { - new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos)); + new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos)); ++index; } while (fs::exists(new_path)); diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index cc8e5cbf364..f0f470d84fa 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1281,6 +1281,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { auto query_configuration = updateConfigurationAndGetCopy(local_context); + auto key = query_configuration.keys.front(); auto sample_block = metadata_snapshot->getSampleBlock(); auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method); @@ -1300,7 +1301,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr chosen_compression_method, query_configuration, query_configuration.url.bucket, - query_configuration.keys.back()); + key); } else { @@ -1308,10 +1309,11 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key); - if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.back(), query_configuration.keys.size())) + if (auto new_key = checkFileExistsAndCreateNewKeyIfNeeded(local_context, configuration, query_configuration.keys.front(), query_configuration.keys.size())) { query_configuration.keys.push_back(*new_key); configuration.keys.push_back(*new_key); + key = *new_key; } return std::make_shared( @@ -1322,7 +1324,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr chosen_compression_method, query_configuration, query_configuration.url.bucket, - query_configuration.keys.back()); + key); } }