mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge 8b01e17be4
into f4038e3153
This commit is contained in:
commit
600b1d56c3
@ -3,6 +3,7 @@
|
||||
#include <IO/AsyncReadCounters.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <base/getThreadId.h>
|
||||
#include <base/demangle.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/ElapsedTimeProfileEventIncrement.h>
|
||||
@ -41,6 +42,11 @@ namespace CurrentMetrics
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
struct AsyncReadIncrement : boost::noncopyable
|
||||
@ -136,6 +142,7 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
|
||||
if (request.ignore)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore);
|
||||
chassert(request.ignore < request.size);
|
||||
reader.ignore(request.ignore);
|
||||
}
|
||||
}
|
||||
@ -152,11 +159,15 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
|
||||
IAsynchronousReader::Result read_result;
|
||||
if (result)
|
||||
{
|
||||
chassert(reader.buffer().begin() == request.buf);
|
||||
chassert(reader.buffer().end() <= request.buf + request.size);
|
||||
read_result.size = reader.buffer().size();
|
||||
read_result.offset = reader.offset();
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size);
|
||||
if (reader.position() < request.buf || reader.buffer().end() > request.buf + request.size)
|
||||
{
|
||||
/// `reader` didn't put the data into the expected buffer. Maybe the ReadBuffer doesn't
|
||||
/// support external buffer, maybe we switched buffer without calling reader.set().
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "ThreadPoolRemoteFSReader got unexpected address range (seek_performed = {}, reader type: {}).", seek_performed, demangle(typeid(reader).name()));
|
||||
}
|
||||
read_result.size = reader.buffer().end() - request.buf;
|
||||
read_result.offset = reader.position() - request.buf;
|
||||
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, reader.buffer().size());
|
||||
}
|
||||
|
||||
read_result.execution_watch = std::move(watch);
|
||||
|
@ -29,9 +29,6 @@ private:
|
||||
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
|
||||
{
|
||||
public:
|
||||
/// `reader_` implementation must ensure that next() places data at the start of internal_buffer,
|
||||
/// even if there was previously a seek. I.e. seek() shouldn't leave pending data (no short seek
|
||||
/// optimization), and nextImpl() shouldn't assign nextimpl_working_buffer_offset.
|
||||
explicit RemoteFSFileDescriptor(
|
||||
SeekableReadBuffer & reader_,
|
||||
std::shared_ptr<AsyncReadCounters> async_read_counters_)
|
||||
|
@ -62,25 +62,35 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
|
||||
|
||||
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const
|
||||
{
|
||||
if (!read_settings.enable_filesystem_cache)
|
||||
LocalFSReadMethod changed_method = read_settings.local_fs_method;
|
||||
switch (read_settings.local_fs_method)
|
||||
{
|
||||
case LocalFSReadMethod::mmap:
|
||||
/// MMap read buffer doesn't support external buffer, required by ReadBufferFromRemoteFSGather.
|
||||
changed_method = LocalFSReadMethod::pread;
|
||||
LOG_INFO(log, "Changing local object storage filesystem read method from `mmap` to `pread`");
|
||||
break;
|
||||
case LocalFSReadMethod::pread_threadpool:
|
||||
case LocalFSReadMethod::pread_fake_async:
|
||||
case LocalFSReadMethod::io_uring:
|
||||
if (read_settings.enable_filesystem_cache)
|
||||
{
|
||||
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used.
|
||||
changed_method = LocalFSReadMethod::pread;
|
||||
LOG_INFO(log, "Changing local object storage filesystem read method to `pread`");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (changed_method == read_settings.local_fs_method && read_settings.direct_io_threshold == 0)
|
||||
return IObjectStorage::patchSettings(read_settings);
|
||||
|
||||
auto modified_settings{read_settings};
|
||||
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used.
|
||||
switch (modified_settings.local_fs_method)
|
||||
{
|
||||
case LocalFSReadMethod::pread_threadpool:
|
||||
case LocalFSReadMethod::pread_fake_async:
|
||||
{
|
||||
modified_settings.local_fs_method = LocalFSReadMethod::pread;
|
||||
LOG_INFO(log, "Changing local filesystem read method to `pread`");
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
modified_settings.local_fs_method = changed_method;
|
||||
/// O_DIRECT imposes buffer alignment restrictions, which ReadBufferFromRemoteFSGather doesn't support.
|
||||
modified_settings.direct_io_threshold = 0;
|
||||
return IObjectStorage::patchSettings(modified_settings);
|
||||
}
|
||||
|
||||
|
@ -49,6 +49,8 @@ public:
|
||||
size_t size = 0;
|
||||
char * buf = nullptr;
|
||||
Priority priority;
|
||||
/// Skip the first `ignore` bytes. I.e. read `size - ignore` bytes at `offset + ignore`.
|
||||
/// The skipping is done using ReadBuffer::ignore.
|
||||
size_t ignore = 0;
|
||||
};
|
||||
|
||||
|
@ -1,2 +1,5 @@
|
||||
1 test
|
||||
1 test
|
||||
1 test
|
||||
1 test
|
||||
1 test
|
||||
|
@ -14,6 +14,10 @@ SETTINGS disk = disk(
|
||||
INSERT INTO test SELECT 1, 'test';
|
||||
SELECT * FROM test;
|
||||
|
||||
SELECT * FROM test SETTINGS min_bytes_to_use_mmap_io=1, local_filesystem_read_method='mmap';
|
||||
SELECT * FROM test SETTINGS local_filesystem_read_method='io_uring';
|
||||
SELECT * FROM test SETTINGS min_bytes_to_use_direct_io=1;
|
||||
|
||||
DROP TABLE test SYNC;
|
||||
|
||||
CREATE TABLE test (a Int32, b String)
|
||||
|
Loading…
Reference in New Issue
Block a user