From 84559ab31cac8d994fbccf9085884c1c2ea8f4b8 Mon Sep 17 00:00:00 2001 From: divanik Date: Thu, 18 Jul 2024 10:06:06 +0000 Subject: [PATCH] Debug code --- .../Local/LocalObjectStorage.cpp | 100 +++++++++--------- .../ObjectStorages/S3/S3ObjectStorage.cpp | 75 ++++++++----- .../ObjectStorage/StorageObjectStorage.h | 2 +- .../StorageObjectStorageSource.cpp | 36 ++++--- 4 files changed, 120 insertions(+), 93 deletions(-) diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index 3704e4bb672..607ce41dd0f 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -58,27 +59,28 @@ std::unique_ptr LocalObjectStorage::readObjects( /// NOL std::string answer(1000, ' '); size_t read_bytes = reader->read(answer.data(), 1000); - LOG_DEBUG(&Poco::Logger::get("ReadBufferFromFileBase 00"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); + LOG_DEBUG(&Poco::Logger::get("ReadBufferFromFileBase"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size); }; + LOG_DEBUG(&Poco::Logger::get("Method"), "Method read: {}", read_settings.remote_fs_method == RemoteFSReadMethod::read); switch (read_settings.remote_fs_method) { case RemoteFSReadMethod::read: { - auto impl2 = std::make_unique( - std::move(read_buffer_creator), - objects, - "file:", - modified_settings, - global_context->getFilesystemCacheLog(), - /* use_external_buffer */ true); + // auto impl2 = std::make_unique( + // std::move(read_buffer_creator), + // objects, + // "file:", + // modified_settings, + // global_context->getFilesystemCacheLog(), + // /* use_external_buffer */ true); - std::string answer(1000, ' '); - size_t read_bytes = impl2->read(answer.data(), 1000); - LOG_DEBUG( - &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); + // std::string answer(1000, ' '); + // size_t read_bytes = impl2->read(answer.data(), 1000); + // LOG_DEBUG( + // &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); return std::make_unique( std::move(read_buffer_creator), objects, "file:", modified_settings, @@ -86,56 +88,58 @@ std::unique_ptr LocalObjectStorage::readObjects( /// NOL } case RemoteFSReadMethod::threadpool: { - LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Threadpool"); + // LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Threadpool"); auto impl = std::make_unique( std::move(read_buffer_creator), objects, "file:", modified_settings, global_context->getFilesystemCacheLog(), /* use_external_buffer */true); - auto impl2 = std::make_unique( - std::move(read_buffer_creator), - objects, - "file:", - modified_settings, - global_context->getFilesystemCacheLog(), - /* use_external_buffer */ true); + // auto impl2 = std::make_unique( + // std::move(read_buffer_creator), + // objects, + // "file:", + // modified_settings, + // global_context->getFilesystemCacheLog(), + // /* use_external_buffer */ true); - std::string answer(1000, ' '); - size_t read_bytes = impl2->read(answer.data(), 1000); - LOG_DEBUG( - &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); + // std::string answer(1000, ' '); + // size_t read_bytes = impl2->read(answer.data(), 1000); + // LOG_DEBUG( + // &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); - auto impl3 = std::make_unique( - std::move(read_buffer_creator), - objects, - "file:", - modified_settings, - global_context->getFilesystemCacheLog(), - /* use_external_buffer */ true); + // auto impl3 = std::make_unique( + // std::move(read_buffer_creator), + // objects, + // "file:", + // modified_settings, + // global_context->getFilesystemCacheLog(), + // /* use_external_buffer */ true); + // auto & reader2 = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); + + // auto async_reader = std::make_unique( + // std::move(impl3), + // reader2, + // read_settings, + // global_context->getAsyncReadCounters(), + // global_context->getFilesystemReadPrefetchesLog()); + + // answer = std::string(1000, ' '); + // read_bytes = async_reader->read(answer.data(), 1000); + // LOG_DEBUG( + // &Poco::Logger::get("AsynchronousBoundedReadBuffer"), + // "Read bytes: {}, string: {}", + // read_bytes, + // answer.substr(0, read_bytes)); + auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - auto async_reader = std::make_unique( - std::move(impl3), + return std::make_unique( + std::move(impl), reader, read_settings, global_context->getAsyncReadCounters(), global_context->getFilesystemReadPrefetchesLog()); - - answer = std::string(1000, ' '); - read_bytes = async_reader->read(answer.data(), 1000); - LOG_DEBUG( - &Poco::Logger::get("AsynchronousBoundedReadBuffer"), - "Read bytes: {}, string: {}", - read_bytes, - answer.substr(0, read_bytes)); - - // reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - - return std::make_unique( - std::move(impl), reader, read_settings, - global_context->getAsyncReadCounters(), - global_context->getFilesystemReadPrefetchesLog()); } } } diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 267c3eaaea4..351f167b6b9 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -200,21 +200,41 @@ std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT restricted_seek); }; + // auto read_buffer_creator2 = [this, settings_ptr, disk_read_settings]( + // bool restricted_seek, const StoredObject & object_) -> std::unique_ptr + // { + // return std::make_unique( + // client.get(), + // uri.bucket, + // object_.remote_path, + // uri.version_id, + // settings_ptr->request_settings, + // disk_read_settings, + // /* use_external_buffer */ true, + // /* offset */ 0, + // /* read_until_position */ 0, + // restricted_seek); + // }; + + switch (read_settings.remote_fs_method) { case RemoteFSReadMethod::read: { - auto impl2 = std::make_unique( - std::move(read_buffer_creator), - objects, - "s3:" + uri.bucket + "/", - disk_read_settings, - global_context->getFilesystemCacheLog(), - /* use_external_buffer */ true); - std::string answer(1000, ' '); - size_t read_bytes = impl2->read(answer.data(), 1000); - LOG_DEBUG( - &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); + // auto impl2 = std::make_unique( + // std::move(read_buffer_creator2), + // objects, + // "s3:" + uri.bucket + "/", + // disk_read_settings, + // global_context->getFilesystemCacheLog(), + // /* use_external_buffer */ false); + // std::string answer(1000, ' '); + // size_t read_bytes = impl2->read(answer.data(), 1000); + // LOG_DEBUG( + // &Poco::Logger::get("ReadBufferFromRemoteFSGather 000"), + // "Read bytes: {}, string: {}", + // read_bytes, + // answer.substr(0, read_bytes)); return std::make_unique( std::move(read_buffer_creator), @@ -224,17 +244,24 @@ std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT global_context->getFilesystemCacheLog(), /* use_external_buffer */false); } - case RemoteFSReadMethod::threadpool: - { + case RemoteFSReadMethod::threadpool: { + // auto impl2 = std::make_unique( + // std::move(read_buffer_creator2), + // objects, + // "s3:" + uri.bucket + "/", + // disk_read_settings, + // global_context->getFilesystemCacheLog(), + // /* use_external_buffer */ true); + + // std::string answer(1000, ' '); + // size_t read_bytes = impl2->read(answer.data(), 1000); + // LOG_DEBUG( + // &Poco::Logger::get("ReadBufferFromRemoteFSGather 001"), + // "Read bytes: {}, string: {}", + // read_bytes, + // answer.substr(0, read_bytes)); + auto impl = std::make_unique( - std::move(read_buffer_creator), - objects, - "s3:" + uri.bucket + "/", - disk_read_settings, - global_context->getFilesystemCacheLog(), - /* use_external_buffer */true); - - auto impl2 = std::make_unique( std::move(read_buffer_creator), objects, "s3:" + uri.bucket + "/", @@ -242,12 +269,6 @@ std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT global_context->getFilesystemCacheLog(), /* use_external_buffer */ true); - std::string answer(1000, ' '); - size_t read_bytes = impl2->read(answer.data(), 1000); - LOG_DEBUG( - &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); - - auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); return std::make_unique( std::move(impl), reader, disk_read_settings, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 818ce055c77..7a13ada224c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -157,7 +157,7 @@ public: ContextPtr local_context, bool with_table_structure); - /// Storage type: s3, hdfs, azure. + /// Storage type: s3, hdfs, azure, local. virtual std::string getTypeName() const = 0; /// Engine name: S3, HDFS, Azure. virtual std::string getEngineName() const = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index f664bed0204..8b800cbc343 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -461,6 +461,8 @@ std::unique_ptr StorageObjectStorageSource::createReadBuffer( auto read_settings = context_->getReadSettings().adjustBufferSize(object_size); + read_settings.remote_fs_method = RemoteFSReadMethod::read; + LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Method threadpool: {}", read_settings.remote_fs_method == RemoteFSReadMethod::threadpool); read_settings.enable_filesystem_cache = false; @@ -477,28 +479,28 @@ std::unique_ptr StorageObjectStorageSource::createReadBuffer( // Create a read buffer that will prefetch the first ~1 MB of the file. // When reading lots of tiny files, this prefetching almost doubles the throughput. // For bigger files, parallel reading is more useful. - if (use_prefetch) - { - LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size); + // if (use_prefetch) + // { + LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size); - LOG_DEBUG(&Poco::Logger::get("Read objects"), "Path: {}, object size: {}", object_info.getPath(), object_size); + LOG_DEBUG(&Poco::Logger::get("Read objects"), "Path: {}, object size: {}", object_info.getPath(), object_size); - auto async_reader = object_storage->readObjects( - StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings); + auto async_reader + = object_storage->readObjects(StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings); - async_reader->setReadUntilEnd(); - if (read_settings.remote_fs_prefetch) - async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY); + async_reader->setReadUntilEnd(); + if (read_settings.remote_fs_prefetch) + async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY); - return async_reader; - } - else - { - /// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting. - LOG_DEBUG(&Poco::Logger::get("Read object"), "Path: {}, object size: {}", object_info.getPath(), object_size); + return async_reader; + // } + // else + // { + // /// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting. + // LOG_DEBUG(&Poco::Logger::get("Read object"), "Path: {}, object size: {}", object_info.getPath(), object_size); - return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings); - } + // return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings); + // } } StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)