Debug code

This commit is contained in:
divanik 2024-07-18 10:06:06 +00:00
parent a760ad9446
commit 84559ab31c
4 changed files with 120 additions and 93 deletions

View File

@ -9,6 +9,7 @@
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h> #include <IO/copyData.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -58,27 +59,28 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
std::string answer(1000, ' '); std::string answer(1000, ' ');
size_t read_bytes = reader->read(answer.data(), 1000); size_t read_bytes = reader->read(answer.data(), 1000);
LOG_DEBUG(&Poco::Logger::get("ReadBufferFromFileBase 00"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); LOG_DEBUG(&Poco::Logger::get("ReadBufferFromFileBase"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size); return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
}; };
LOG_DEBUG(&Poco::Logger::get("Method"), "Method read: {}", read_settings.remote_fs_method == RemoteFSReadMethod::read);
switch (read_settings.remote_fs_method) switch (read_settings.remote_fs_method)
{ {
case RemoteFSReadMethod::read: case RemoteFSReadMethod::read:
{ {
auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>( // auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), // std::move(read_buffer_creator),
objects, // objects,
"file:", // "file:",
modified_settings, // modified_settings,
global_context->getFilesystemCacheLog(), // global_context->getFilesystemCacheLog(),
/* use_external_buffer */ true); // /* use_external_buffer */ true);
std::string answer(1000, ' '); // std::string answer(1000, ' ');
size_t read_bytes = impl2->read(answer.data(), 1000); // size_t read_bytes = impl2->read(answer.data(), 1000);
LOG_DEBUG( // LOG_DEBUG(
&Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); // &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
return std::make_unique<ReadBufferFromRemoteFSGather>( return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "file:", modified_settings, std::move(read_buffer_creator), objects, "file:", modified_settings,
@ -86,56 +88,58 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
} }
case RemoteFSReadMethod::threadpool: case RemoteFSReadMethod::threadpool:
{ {
LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Threadpool"); // LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Threadpool");
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>( auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "file:", modified_settings, std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */true); global_context->getFilesystemCacheLog(), /* use_external_buffer */true);
auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>( // auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), // std::move(read_buffer_creator),
objects, // objects,
"file:", // "file:",
modified_settings, // modified_settings,
global_context->getFilesystemCacheLog(), // global_context->getFilesystemCacheLog(),
/* use_external_buffer */ true); // /* use_external_buffer */ true);
std::string answer(1000, ' '); // std::string answer(1000, ' ');
size_t read_bytes = impl2->read(answer.data(), 1000); // size_t read_bytes = impl2->read(answer.data(), 1000);
LOG_DEBUG( // LOG_DEBUG(
&Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); // &Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
auto impl3 = std::make_unique<ReadBufferFromRemoteFSGather>( // auto impl3 = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), // std::move(read_buffer_creator),
objects, // objects,
"file:", // "file:",
modified_settings, // modified_settings,
global_context->getFilesystemCacheLog(), // global_context->getFilesystemCacheLog(),
/* use_external_buffer */ true); // /* use_external_buffer */ true);
// auto & reader2 = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
// auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>(
// std::move(impl3),
// reader2,
// read_settings,
// global_context->getAsyncReadCounters(),
// global_context->getFilesystemReadPrefetchesLog());
// answer = std::string(1000, ' ');
// read_bytes = async_reader->read(answer.data(), 1000);
// LOG_DEBUG(
// &Poco::Logger::get("AsynchronousBoundedReadBuffer"),
// "Read bytes: {}, string: {}",
// read_bytes,
// answer.substr(0, read_bytes));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>( return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl3), std::move(impl),
reader, reader,
read_settings, read_settings,
global_context->getAsyncReadCounters(), global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog()); global_context->getFilesystemReadPrefetchesLog());
answer = std::string(1000, ' ');
read_bytes = async_reader->read(answer.data(), 1000);
LOG_DEBUG(
&Poco::Logger::get("AsynchronousBoundedReadBuffer"),
"Read bytes: {}, string: {}",
read_bytes,
answer.substr(0, read_bytes));
// reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
} }
} }
} }

View File

@ -200,21 +200,41 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
restricted_seek); restricted_seek);
}; };
// auto read_buffer_creator2 = [this, settings_ptr, disk_read_settings](
// bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
// {
// return std::make_unique<ReadBufferFromS3>(
// client.get(),
// uri.bucket,
// object_.remote_path,
// uri.version_id,
// settings_ptr->request_settings,
// disk_read_settings,
// /* use_external_buffer */ true,
// /* offset */ 0,
// /* read_until_position */ 0,
// restricted_seek);
// };
switch (read_settings.remote_fs_method) switch (read_settings.remote_fs_method)
{ {
case RemoteFSReadMethod::read: case RemoteFSReadMethod::read:
{ {
auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>( // auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), // std::move(read_buffer_creator2),
objects, // objects,
"s3:" + uri.bucket + "/", // "s3:" + uri.bucket + "/",
disk_read_settings, // disk_read_settings,
global_context->getFilesystemCacheLog(), // global_context->getFilesystemCacheLog(),
/* use_external_buffer */ true); // /* use_external_buffer */ false);
std::string answer(1000, ' '); // std::string answer(1000, ' ');
size_t read_bytes = impl2->read(answer.data(), 1000); // size_t read_bytes = impl2->read(answer.data(), 1000);
LOG_DEBUG( // LOG_DEBUG(
&Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes)); // &Poco::Logger::get("ReadBufferFromRemoteFSGather 000"),
// "Read bytes: {}, string: {}",
// read_bytes,
// answer.substr(0, read_bytes));
return std::make_unique<ReadBufferFromRemoteFSGather>( return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), std::move(read_buffer_creator),
@ -224,8 +244,23 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
global_context->getFilesystemCacheLog(), global_context->getFilesystemCacheLog(),
/* use_external_buffer */false); /* use_external_buffer */false);
} }
case RemoteFSReadMethod::threadpool: case RemoteFSReadMethod::threadpool: {
{ // auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
// std::move(read_buffer_creator2),
// objects,
// "s3:" + uri.bucket + "/",
// disk_read_settings,
// global_context->getFilesystemCacheLog(),
// /* use_external_buffer */ true);
// std::string answer(1000, ' ');
// size_t read_bytes = impl2->read(answer.data(), 1000);
// LOG_DEBUG(
// &Poco::Logger::get("ReadBufferFromRemoteFSGather 001"),
// "Read bytes: {}, string: {}",
// read_bytes,
// answer.substr(0, read_bytes));
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>( auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), std::move(read_buffer_creator),
objects, objects,
@ -234,20 +269,6 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
global_context->getFilesystemCacheLog(), global_context->getFilesystemCacheLog(),
/* use_external_buffer */ true); /* use_external_buffer */ true);
auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"s3:" + uri.bucket + "/",
disk_read_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */ true);
std::string answer(1000, ' ');
size_t read_bytes = impl2->read(answer.data(), 1000);
LOG_DEBUG(
&Poco::Logger::get("ReadBufferFromRemoteFSGather"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>( return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, disk_read_settings, std::move(impl), reader, disk_read_settings,

View File

@ -157,7 +157,7 @@ public:
ContextPtr local_context, ContextPtr local_context,
bool with_table_structure); bool with_table_structure);
/// Storage type: s3, hdfs, azure. /// Storage type: s3, hdfs, azure, local.
virtual std::string getTypeName() const = 0; virtual std::string getTypeName() const = 0;
/// Engine name: S3, HDFS, Azure. /// Engine name: S3, HDFS, Azure.
virtual std::string getEngineName() const = 0; virtual std::string getEngineName() const = 0;

View File

@ -461,6 +461,8 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size); auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
read_settings.remote_fs_method = RemoteFSReadMethod::read;
LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Method threadpool: {}", read_settings.remote_fs_method == RemoteFSReadMethod::threadpool); LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Method threadpool: {}", read_settings.remote_fs_method == RemoteFSReadMethod::threadpool);
read_settings.enable_filesystem_cache = false; read_settings.enable_filesystem_cache = false;
@ -477,28 +479,28 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
// Create a read buffer that will prefetch the first ~1 MB of the file. // Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput. // When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful. // For bigger files, parallel reading is more useful.
if (use_prefetch) // if (use_prefetch)
{ // {
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size); LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
LOG_DEBUG(&Poco::Logger::get("Read objects"), "Path: {}, object size: {}", object_info.getPath(), object_size); LOG_DEBUG(&Poco::Logger::get("Read objects"), "Path: {}, object size: {}", object_info.getPath(), object_size);
auto async_reader = object_storage->readObjects( auto async_reader
StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings); = object_storage->readObjects(StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings);
async_reader->setReadUntilEnd(); async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch) if (read_settings.remote_fs_prefetch)
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY); async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
return async_reader; return async_reader;
} // }
else // else
{ // {
/// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting. // /// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
LOG_DEBUG(&Poco::Logger::get("Read object"), "Path: {}, object size: {}", object_info.getPath(), object_size); // LOG_DEBUG(&Poco::Logger::get("Read object"), "Path: {}, object size: {}", object_info.getPath(), object_size);
return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings); // return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
} // }
} }
StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_) StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)