From 8fad2860251abf3fc71b525522c7ce51bd4687aa Mon Sep 17 00:00:00 2001 From: divanik Date: Thu, 18 Jul 2024 15:03:12 +0000 Subject: [PATCH] Remove asyncronous processing for local storage --- src/Core/Settings.h | 1 + src/Core/SettingsChangesHistory.cpp | 5 +- .../Local/LocalObjectStorage.cpp | 96 ++----------------- .../ObjectStorage/Local/Configuration.cpp | 2 +- .../StorageObjectStorageSource.cpp | 2 +- src/Storages/ObjectStorage/Utils.cpp | 5 +- 6 files changed, 20 insertions(+), 91 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 92b1c750a55..10b869e58f7 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -114,6 +114,7 @@ class IColumn; M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \ M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \ + M(Bool, local_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in local object storage engine tables", 0) \ M(Bool, s3_skip_empty_files, false, "Allow to skip empty files in s3 table engine", 0) \ M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \ M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 73fb28dc67a..5de4630758d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,8 +79,9 @@ static std::initializer_list LocalObjectStorage::readObjects( /// NOL { auto modified_settings = patchSettings(read_settings); auto global_context = Context::getGlobalContextInstance(); - auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr + auto read_buffer_creator = + [=] (bool /* restricted_seek */, const StoredObject & object) + -> std::unique_ptr { - LOG_DEBUG(&Poco::Logger::get("Read"), "Remote Path: {}", object.remote_path); - - auto reader = createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size); - - std::string answer(1000, ' '); - size_t read_bytes = reader->read(answer.data(), 1000); - LOG_DEBUG(&Poco::Logger::get("ReadBufferFromFileBase"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); - return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size); }; - LOG_DEBUG(&Poco::Logger::get("Method"), "Method read: {}", read_settings.remote_fs_method == RemoteFSReadMethod::read); - switch (read_settings.remote_fs_method) - { - case RemoteFSReadMethod::read: - { - // auto impl2 = std::make_unique( - // std::move(read_buffer_creator), - // objects, - // "file:", - // modified_settings, - // global_context->getFilesystemCacheLog(), - // /* use_external_buffer */ true); - // std::string answer(1000, ' '); - // size_t read_bytes = impl2->read(answer.data(), 1000); - // LOG_DEBUG( - // &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); - - return std::make_unique( - std::move(read_buffer_creator), objects, "file:", modified_settings, - global_context->getFilesystemCacheLog(), /* use_external_buffer */false); - } - case RemoteFSReadMethod::threadpool: - { - // LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Threadpool"); - auto impl = std::make_unique( - std::move(read_buffer_creator), objects, "file:", modified_settings, - global_context->getFilesystemCacheLog(), /* use_external_buffer */true); - - // auto impl2 = std::make_unique( - // std::move(read_buffer_creator), - // objects, - // "file:", - // modified_settings, - // global_context->getFilesystemCacheLog(), - // /* use_external_buffer */ true); - - // std::string answer(1000, ' '); - // size_t read_bytes = impl2->read(answer.data(), 1000); - // LOG_DEBUG( - // &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); - - // auto impl3 = std::make_unique( - // std::move(read_buffer_creator), - // objects, - // "file:", - // modified_settings, - // global_context->getFilesystemCacheLog(), - // /* use_external_buffer */ true); - - - // auto & reader2 = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - - // auto async_reader = std::make_unique( - // std::move(impl3), - // reader2, - // read_settings, - // global_context->getAsyncReadCounters(), - // global_context->getFilesystemReadPrefetchesLog()); - - // answer = std::string(1000, ' '); - // read_bytes = async_reader->read(answer.data(), 1000); - // LOG_DEBUG( - // &Poco::Logger::get("AsynchronousBoundedReadBuffer"), - // "Read bytes: {}, string: {}", - // read_bytes, - // answer.substr(0, read_bytes)); - - auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - - return std::make_unique( - std::move(impl), - reader, - read_settings, - global_context->getAsyncReadCounters(), - global_context->getFilesystemReadPrefetchesLog()); - } - } + return std::make_unique( + std::move(read_buffer_creator), + objects, + "file:", + modified_settings, + global_context->getFilesystemCacheLog(), + /* use_external_buffer */ false); } ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const diff --git a/src/Storages/ObjectStorage/Local/Configuration.cpp b/src/Storages/ObjectStorage/Local/Configuration.cpp index 740132f0981..364bd21a64e 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.cpp +++ b/src/Storages/ObjectStorage/Local/Configuration.cpp @@ -66,7 +66,7 @@ StorageObjectStorage::QuerySettings StorageLocalConfiguration::getQuerySettings( const auto & settings = context->getSettingsRef(); return StorageObjectStorage::QuerySettings{ .truncate_on_insert = settings.engine_file_truncate_on_insert, - .create_new_file_on_insert = false, + .create_new_file_on_insert = settings.local_create_new_file_on_insert, .schema_inference_use_cache = settings.schema_inference_use_cache_for_file, .schema_inference_mode = settings.schema_inference_mode, .skip_empty_files = settings.engine_file_skip_empty_files, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 8b800cbc343..3053a48b75b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -461,7 +461,7 @@ std::unique_ptr StorageObjectStorageSource::createReadBuffer( auto read_settings = context_->getReadSettings().adjustBufferSize(object_size); - read_settings.remote_fs_method = RemoteFSReadMethod::read; + // read_settings.remote_fs_method = RemoteFSReadMethod::read; LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Method threadpool: {}", read_settings.remote_fs_method == RemoteFSReadMethod::threadpool); diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index e49e14d2a0c..6491deef440 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -40,7 +40,10 @@ std::optional checkAndGetNewFileOnInsertIfNeeded( "Object in bucket {} with key {} already exists. " "If you want to overwrite it, enable setting {}_truncate_on_insert, if you " "want to create a new file on each insert, enable setting {}_create_new_file_on_insert", - configuration.getNamespace(), key, configuration.getTypeName(), configuration.getTypeName()); + configuration.getNamespace(), + key, + configuration.getTypeName() == "local" ? "engine_file" : configuration.getTypeName(), + configuration.getTypeName()); } void resolveSchemaAndFormat(