Merge pull request #30191 from azat/aio-O_DIRECT-fix

Fix pread_fake_async/pread_threadpool with min_bytes_to_use_direct_io
This commit is contained in:
alexey-milovidov 2021-10-18 22:29:31 +03:00 committed by GitHub
commit 885c4daf94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 5 deletions

View File

@ -40,6 +40,7 @@ public:
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
{
prefetch_buffer.alignment = alignment;
}
~AsynchronousReadBufferFromFileDescriptor() override;

View File

@ -88,7 +88,11 @@ struct Memory : boost::noncopyable, Allocator
}
else
{
size_t new_capacity = align(new_size + pad_right, alignment);
size_t new_capacity = align(new_size, alignment) + pad_right;
size_t diff = new_capacity - m_capacity;
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, diff);
m_data = static_cast<char *>(Allocator::realloc(m_data, m_capacity, new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;
@ -101,6 +105,9 @@ private:
if (!alignment)
return value;
if (!(value % alignment))
return value;
return (value + alignment - 1) / alignment * alignment;
}
@ -112,12 +119,10 @@ private:
return;
}
size_t padded_capacity = m_capacity + pad_right;
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, padded_capacity);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, m_capacity);
size_t new_capacity = align(padded_capacity, alignment);
size_t new_capacity = align(m_capacity, alignment) + pad_right;
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = m_capacity - pad_right;

View File

@ -5,6 +5,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/MemorySanitizer.h>
#include <base/errnoToString.h>
#include <Poco/Event.h>
#include <future>
@ -151,6 +152,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
else
{
bytes_read += res;
__msan_unpoison(request.buf, res);
}
}

View File

@ -0,0 +1,11 @@
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] -%}
{% for direct_io in [0, 1] -%}
{% for prefetch in [0, 1] -%}
{% for priority in [0, 1] -%}
{% for buffer_size in [65505, 1048576] -%}
1000000
{% endfor -%}
{% endfor -%}
{% endfor -%}
{% endfor -%}
{% endfor -%}

View File

@ -0,0 +1,31 @@
-- Tags: long
--
-- Test for testing various read settings.
drop table if exists data_02051;
create table data_02051 (key Int, value String) engine=MergeTree() order by key
as select number, repeat(toString(number), 5) from numbers(1e6);
{# check each local_filesystem_read_method #}
{% for read_method in ['read', 'mmap', 'pread_threadpool', 'pread_fake_async'] %}
{# check w/ O_DIRECT and w/o (min_bytes_to_use_direct_io) #}
{% for direct_io in [0, 1] %}
{# check local_filesystem_read_prefetch (just a smoke test) #}
{% for prefetch in [0, 1] %}
{# check read_priority (just a smoke test) #}
{% for priority in [0, 1] %}
{# check alignment for O_DIRECT with various max_read_buffer_size #}
{% for buffer_size in [65505, 1048576] %}
select count(ignore(*)) from data_02051 settings
min_bytes_to_use_direct_io={{ direct_io }},
local_filesystem_read_method='{{ read_method }}',
local_filesystem_read_prefetch={{ prefetch }},
read_priority={{ priority }},
max_read_buffer_size={{ buffer_size }}
;
{% endfor %}
{% endfor %}
{% endfor %}
{% endfor %}
{% endfor %}