Small improvement

This commit is contained in:
kssenii 2023-04-20 18:15:09 +02:00
parent 59211c490a
commit 6552ec6afc
5 changed files with 13 additions and 5 deletions

View File

@ -664,6 +664,7 @@ class IColumn;
M(Bool, allow_prefetched_read_pool_for_remote_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
M(Bool, allow_prefetched_read_pool_for_local_filesystem, false, "Prefer prefethed threadpool if all parts are on remote filesystem", 0) \
\
M(UInt64, prefetch_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the prefetch buffer to read from the filesystem.", 0) \
M(UInt64, filesystem_prefetch_step_bytes, 0, "Prefetch step in bytes. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_step_marks, 0, "Prefetch step in marks. Zero means `auto` - approximately the best prefetch step will be auto deduced, but might not be 100% the best. The actual value might be different because of setting filesystem_prefetch_min_bytes_for_single_read_task", 0) \
M(UInt64, filesystem_prefetch_min_bytes_for_single_read_task, "8Mi", "Do not parallelize within one file read less than this amount of bytes. E.g. one reader will not receive a read task of size less than this amount. This setting is recommended to avoid spikes of time for aws getObject requests to aws", 0) \

View File

@ -53,7 +53,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.remote_fs_buffer_size)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
@ -139,7 +139,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)
last_prefetch_info.priority = priority;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}

View File

@ -70,6 +70,7 @@ struct ReadSettings
size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t prefetch_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
bool local_fs_prefetch = false;
bool remote_fs_prefetch = false;
@ -127,6 +128,7 @@ struct ReadSettings
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size);
res.prefetch_buffer_size = std::min(file_size, prefetch_buffer_size);
return res;
}
};

View File

@ -4266,6 +4266,7 @@ ReadSettings Context::getReadSettings() const
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.remote_fs_buffer_size = settings.max_read_buffer_size;
res.prefetch_buffer_size = settings.prefetch_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;

View File

@ -525,20 +525,24 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
if (allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false || allowed_prefetches_num.value() > 0))
{
/// adjustBufferSize(), which is done in MergeTreeReaderStream and MergeTreeReaderCompact,
/// lowers buffer size if file size (or required read range) is less. So we know that the
/// settings.prefetch_buffer_size will be lowered there, therefore we account it here as well.
/// But here we make a more approximate lowering, while in adjustBufferSize it will be presize.
size_t estimated_memory_usage = 0;
size_t estimated_prefetches_num = 0;
for (const auto & col : part.task_columns.columns)
{
const auto col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.max_read_buffer_size);
estimated_memory_usage += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++estimated_prefetches_num;
}
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
{
const auto col_size = part.data_part->getColumnSize(
LightweightDeleteDescription::FILTER_COLUMN.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.max_read_buffer_size);
estimated_memory_usage += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++estimated_prefetches_num;
}
if (prewhere_info)
@ -548,7 +552,7 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
for (const auto & col : columns)
{
const size_t col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.max_read_buffer_size);
estimated_memory_usage += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++estimated_prefetches_num;
}
}