mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Try to fix bug
This commit is contained in:
parent
f5e993df2a
commit
29ce915d00
@ -169,6 +169,7 @@ void ReadBufferFromRemoteFSGather::initialize()
|
|||||||
|
|
||||||
bool ReadBufferFromRemoteFSGather::nextImpl()
|
bool ReadBufferFromRemoteFSGather::nextImpl()
|
||||||
{
|
{
|
||||||
|
// LOG_DEBU
|
||||||
/// Find first available buffer that fits to given offset.
|
/// Find first available buffer that fits to given offset.
|
||||||
if (!current_buf)
|
if (!current_buf)
|
||||||
initialize();
|
initialize();
|
||||||
@ -230,6 +231,7 @@ void ReadBufferFromRemoteFSGather::reset()
|
|||||||
{
|
{
|
||||||
current_object = StoredObject();
|
current_object = StoredObject();
|
||||||
current_buf_idx = {};
|
current_buf_idx = {};
|
||||||
|
// buffer_cemetery_.push_back(current_buf);
|
||||||
current_buf.reset();
|
current_buf.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +85,8 @@ private:
|
|||||||
size_t current_buf_idx = 0;
|
size_t current_buf_idx = 0;
|
||||||
SeekableReadBufferPtr current_buf;
|
SeekableReadBufferPtr current_buf;
|
||||||
|
|
||||||
|
std::deque<SeekableReadBufferPtr> buffer_cemetery_;
|
||||||
|
|
||||||
LoggerPtr log;
|
LoggerPtr log;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,14 +1,15 @@
|
|||||||
|
#include <Disks/IO/IOUringReader.h>
|
||||||
|
#include <Disks/IO/ThreadPoolReader.h>
|
||||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||||
|
#include <Disks/IO/getIOUringReader.h>
|
||||||
|
#include <Disks/IO/getThreadPoolReader.h>
|
||||||
|
#include <IO/AsynchronousReadBufferFromFile.h>
|
||||||
|
#include <IO/AsynchronousReader.h>
|
||||||
|
#include <IO/MMapReadBufferFromFileWithCache.h>
|
||||||
#include <IO/ReadBufferFromEmptyFile.h>
|
#include <IO/ReadBufferFromEmptyFile.h>
|
||||||
#include <IO/ReadBufferFromFile.h>
|
#include <IO/ReadBufferFromFile.h>
|
||||||
#include <IO/MMapReadBufferFromFileWithCache.h>
|
|
||||||
#include <IO/AsynchronousReadBufferFromFile.h>
|
|
||||||
#include <Disks/IO/IOUringReader.h>
|
|
||||||
#include <Disks/IO/getIOUringReader.h>
|
|
||||||
#include <Disks/IO/ThreadPoolReader.h>
|
|
||||||
#include <Disks/IO/getThreadPoolReader.h>
|
|
||||||
#include <IO/AsynchronousReader.h>
|
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
|
#include "ReadBufferFromRemoteFSGather.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -77,6 +78,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
|
|
||||||
if (settings.local_fs_method == LocalFSReadMethod::read)
|
if (settings.local_fs_method == LocalFSReadMethod::read)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Read settings"), "Read settings 1");
|
||||||
res = std::make_unique<ReadBufferFromFile>(
|
res = std::make_unique<ReadBufferFromFile>(
|
||||||
filename,
|
filename,
|
||||||
buffer_size,
|
buffer_size,
|
||||||
@ -88,6 +90,8 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
}
|
}
|
||||||
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
|
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Read settings"), "Read settings 2");
|
||||||
|
|
||||||
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(
|
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(
|
||||||
filename,
|
filename,
|
||||||
buffer_size,
|
buffer_size,
|
||||||
@ -99,6 +103,8 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
}
|
}
|
||||||
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
|
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Read settings"), "Read settings 3");
|
||||||
|
|
||||||
#if USE_LIBURING
|
#if USE_LIBURING
|
||||||
auto & reader = getIOUringReaderOrThrow();
|
auto & reader = getIOUringReaderOrThrow();
|
||||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||||
@ -117,6 +123,8 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
}
|
}
|
||||||
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
|
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Read settings"), "Read settings 4");
|
||||||
|
|
||||||
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
|
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
|
||||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||||
reader,
|
reader,
|
||||||
@ -131,6 +139,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
}
|
}
|
||||||
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
|
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Read settings"), "Read settings 5");
|
||||||
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
|
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
|
||||||
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
|
||||||
reader,
|
reader,
|
||||||
@ -144,8 +153,11 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
|
|||||||
settings.local_throttler);
|
settings.local_throttler);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Read settings"), "Read settings 6");
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");
|
||||||
|
}
|
||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -48,11 +48,12 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
|
|||||||
{
|
{
|
||||||
auto modified_settings = patchSettings(read_settings);
|
auto modified_settings = patchSettings(read_settings);
|
||||||
auto global_context = Context::getGlobalContextInstance();
|
auto global_context = Context::getGlobalContextInstance();
|
||||||
auto read_buffer_creator =
|
auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
|
||||||
[=] (bool /* restricted_seek */, const StoredObject & object)
|
|
||||||
-> std::unique_ptr<ReadBufferFromFileBase>
|
|
||||||
{
|
{
|
||||||
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
|
LOG_DEBUG(&Poco::Logger::get("Get object path"), "Remote Path: {}", object.remote_path);
|
||||||
|
auto kek = createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Buffer created"), "Remote Path: {}", object.remote_path);
|
||||||
|
return kek;
|
||||||
};
|
};
|
||||||
|
|
||||||
return std::make_unique<ReadBufferFromRemoteFSGather>(
|
return std::make_unique<ReadBufferFromRemoteFSGather>(
|
||||||
|
@ -103,6 +103,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
bool ALWAYS_INLINE eof()
|
bool ALWAYS_INLINE eof()
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG();
|
||||||
return !hasPendingData() && !next();
|
return !hasPendingData() && !next();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,6 +183,8 @@ public:
|
|||||||
|
|
||||||
while (bytes_copied < n && !eof())
|
while (bytes_copied < n && !eof())
|
||||||
{
|
{
|
||||||
|
auto k = *pos;
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("Next symbol in read"), "Symbol: {}", k);
|
||||||
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
|
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
|
||||||
::memcpy(to + bytes_copied, pos, bytes_to_copy);
|
::memcpy(to + bytes_copied, pos, bytes_to_copy);
|
||||||
pos += bytes_to_copy;
|
pos += bytes_to_copy;
|
||||||
|
@ -413,10 +413,7 @@ std::future<StorageObjectStorageSource::ReaderHolder> StorageObjectStorageSource
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
||||||
const ObjectInfo & object_info,
|
const ObjectInfo & object_info, const ObjectStoragePtr & object_storage, const ContextPtr & context_, const LoggerPtr & log)
|
||||||
const ObjectStoragePtr & object_storage,
|
|
||||||
const ContextPtr & context_,
|
|
||||||
const LoggerPtr & log)
|
|
||||||
{
|
{
|
||||||
const auto & object_size = object_info.metadata->size_bytes;
|
const auto & object_size = object_info.metadata->size_bytes;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user