Remove asyncronous processing for local storage

This commit is contained in:
divanik 2024-07-18 15:03:12 +00:00
parent 84559ab31c
commit 8fad286025
6 changed files with 20 additions and 91 deletions

View File

@ -114,6 +114,7 @@ class IColumn;
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, local_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in local object storage engine tables", 0) \
M(Bool, s3_skip_empty_files, false, "Allow to skip empty files in s3 table engine", 0) \
M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \

View File

@ -79,8 +79,9 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
}},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
{"local_create_new_file_on_insert", false, false, "Enables or disables creating a new file on each insert in local object storage engine tables"}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},

View File

@ -51,97 +51,21 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
auto read_buffer_creator =
[=] (bool /* restricted_seek */, const StoredObject & object)
-> std::unique_ptr<ReadBufferFromFileBase>
{
LOG_DEBUG(&Poco::Logger::get("Read"), "Remote Path: {}", object.remote_path);
auto reader = createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
std::string answer(1000, ' ');
size_t read_bytes = reader->read(answer.data(), 1000);
LOG_DEBUG(&Poco::Logger::get("ReadBufferFromFileBase"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
};
LOG_DEBUG(&Poco::Logger::get("Method"), "Method read: {}", read_settings.remote_fs_method == RemoteFSReadMethod::read);
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
// auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
// std::move(read_buffer_creator),
// objects,
// "file:",
// modified_settings,
// global_context->getFilesystemCacheLog(),
// /* use_external_buffer */ true);
// std::string answer(1000, ' ');
// size_t read_bytes = impl2->read(answer.data(), 1000);
// LOG_DEBUG(
// &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
// LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Threadpool");
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */true);
// auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
// std::move(read_buffer_creator),
// objects,
// "file:",
// modified_settings,
// global_context->getFilesystemCacheLog(),
// /* use_external_buffer */ true);
// std::string answer(1000, ' ');
// size_t read_bytes = impl2->read(answer.data(), 1000);
// LOG_DEBUG(
// &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
// auto impl3 = std::make_unique<ReadBufferFromRemoteFSGather>(
// std::move(read_buffer_creator),
// objects,
// "file:",
// modified_settings,
// global_context->getFilesystemCacheLog(),
// /* use_external_buffer */ true);
// auto & reader2 = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
// auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>(
// std::move(impl3),
// reader2,
// read_settings,
// global_context->getAsyncReadCounters(),
// global_context->getFilesystemReadPrefetchesLog());
// answer = std::string(1000, ' ');
// read_bytes = async_reader->read(answer.data(), 1000);
// LOG_DEBUG(
// &Poco::Logger::get("AsynchronousBoundedReadBuffer"),
// "Read bytes: {}, string: {}",
// read_bytes,
// answer.substr(0, read_bytes));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl),
reader,
read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"file:",
modified_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */ false);
}
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const

View File

@ -66,7 +66,7 @@ StorageObjectStorage::QuerySettings StorageLocalConfiguration::getQuerySettings(
const auto & settings = context->getSettingsRef();
return StorageObjectStorage::QuerySettings{
.truncate_on_insert = settings.engine_file_truncate_on_insert,
.create_new_file_on_insert = false,
.create_new_file_on_insert = settings.local_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_file,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.engine_file_skip_empty_files,

View File

@ -461,7 +461,7 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
read_settings.remote_fs_method = RemoteFSReadMethod::read;
// read_settings.remote_fs_method = RemoteFSReadMethod::read;
LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Method threadpool: {}", read_settings.remote_fs_method == RemoteFSReadMethod::threadpool);

View File

@ -40,7 +40,10 @@ std::optional<String> checkAndGetNewFileOnInsertIfNeeded(
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting {}_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting {}_create_new_file_on_insert",
configuration.getNamespace(), key, configuration.getTypeName(), configuration.getTypeName());
configuration.getNamespace(),
key,
configuration.getTypeName() == "local" ? "engine_file" : configuration.getTypeName(),
configuration.getTypeName());
}
void resolveSchemaAndFormat(