Better performance for smaller number of threads

This commit is contained in:
kssenii 2021-10-15 15:07:39 +03:00
parent c1c574e9ca
commit edfb1a5521
10 changed files with 51 additions and 24 deletions

View File

@ -255,7 +255,7 @@
M(RemoteFSSeeks, "Total number of seeks for async buffer") \
M(RemoteFSPrefetches, "Total number of prefetches") \
M(RemoteFSSeekCancelledPrefetches, "Number of cancelled prefecthes because of seek") \
M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches prending in buffer desctructor") \
M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches pending in buffer destructor") \
M(RemoteFSPrefetchReads, "Total number of reads from prefecthed buffer") \
M(RemoteFSAsyncBufferReads, "Number of nextImpl() calls for async buffer") \
M(RemoteFSSimpleBufferReads, "Number of nextImpl() calls for non-async buffer") \

View File

@ -27,7 +27,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int LOGICAL_ERROR;
}
@ -160,7 +160,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
prefetch_future = {};
/// TODO: it does not really seem to improve anything to call prefecth() here,
/// TODO: it does not really seem to improve anything to call prefetch() here,
/// but it does not make any worse at the same time.
/// Need to test, it might be useful because in fact sometimes (minority of cases though)
/// we can read without prefetching several times in a row.

View File

@ -51,11 +51,11 @@ bool ReadBufferFromS3::nextImpl()
{
if (last_offset)
{
if (static_cast<off_t>(last_offset) == offset)
if (last_offset == offset)
return false;
if (static_cast<off_t>(last_offset) < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1);
if (last_offset < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1);
}
bool next_result = false;
@ -173,8 +173,8 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
if (last_offset)
{
if (offset >= static_cast<off_t>(last_offset))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1);
if (offset >= last_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1);
req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1));
LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1);

View File

@ -59,7 +59,7 @@ private:
ReadSettings read_settings;
bool use_external_buffer;
size_t last_offset;
off_t last_offset;
};
}

View File

@ -23,7 +23,11 @@ namespace ErrorCodes
static constexpr size_t HTTP_MAX_TRIES = 10;
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_, size_t)
const String & url_,
ContextPtr context_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t last_offset_)
: SeekableReadBuffer(nullptr, 0)
, log(&Poco::Logger::get("ReadBufferFromWebServer"))
, context(context_)
@ -31,6 +35,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
, buf_size(settings_.remote_fs_buffer_size)
, read_settings(settings_)
, use_external_buffer(use_external_buffer_)
, last_offset(last_offset_)
{
}
@ -41,11 +46,13 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
auto right_offset = read_settings.remote_read_right_offset;
if (right_offset)
if (last_offset)
{
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-{}", offset, right_offset)));
LOG_DEBUG(log, "Reading with range: {}-{}", offset, right_offset);
if (last_offset < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1);
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-{}", offset, last_offset - 1)));
LOG_DEBUG(log, "Reading with range: {}-{}", offset, last_offset);
}
else
{
@ -120,6 +127,15 @@ void ReadBufferFromWebServer::initializeWithRetry()
bool ReadBufferFromWebServer::nextImpl()
{
if (last_offset)
{
if (last_offset == offset)
return false;
if (last_offset < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1);
}
if (impl)
{
if (use_external_buffer)

View File

@ -47,6 +47,8 @@ private:
ReadSettings read_settings;
bool use_external_buffer;
off_t last_offset = 0;
};
}

View File

@ -77,8 +77,6 @@ struct ReadSettings
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;
size_t remote_read_right_offset = 0; /// Right offset for range reading.
bool http_retriable_read = true;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;

View File

@ -265,6 +265,9 @@ namespace detail
if (next_callback)
next_callback(count());
if (total_bytes_to_read && bytes_read == total_bytes_to_read.value())
return false;
if (impl)
{
if (use_external_buffer)
@ -284,7 +287,7 @@ namespace detail
{
/**
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read, becuase
* sure there is no pending data which was not read, because
* this branch means we read sequentially.
*/
if (!working_buffer.empty())
@ -292,9 +295,6 @@ namespace detail
}
}
if (total_bytes_to_read && bytes_read == total_bytes_to_read.value())
return false;
if (impl && !working_buffer.empty())
impl->position() = position();
@ -322,7 +322,7 @@ namespace detail
|| (bytes_read && !settings.http_retriable_read))
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), e.what());
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
impl.reset();
sleepForMilliseconds(milliseconds_to_wait);

View File

@ -42,6 +42,12 @@ MergeTreeReadPool::MergeTreeReadPool(
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_);
auto min_marks_for_concurrent_read = min_marks_for_concurrent_read_;
if (stored_on_remote_disk)
{
do_not_steal_tasks = true;
min_marks_for_concurrent_read = std::max(min_marks_for_concurrent_read, sum_marks_ / threads_);
}
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_);
}
@ -89,8 +95,11 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
auto & part = parts_with_idx[part_idx];
auto & marks_in_part = thread_tasks.sum_marks_in_parts.back();
/// Get whole part to read if it is small enough.
auto need_marks = std::min(marks_in_part, min_marks_to_read);
size_t need_marks;
if (stored_on_remote_disk) /// For better performance with remote disks
need_marks = marks_in_part;
else /// Get whole part to read if it is small enough.
need_marks = std::min(marks_in_part, min_marks_to_read);
/// Do not leave too little rows in part for next time.
if (marks_in_part > need_marks &&
@ -223,7 +232,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
/// Turn off tasks stealing in case there is remote disk.
if (part.data_part->isStoredOnRemoteDisk())
do_not_steal_tasks = true;
stored_on_remote_disk = true;
/// Read marks for every data part.
size_t sum_marks = 0;

View File

@ -139,6 +139,8 @@ private:
mutable std::mutex mutex;
Poco::Logger * log = &Poco::Logger::get("MergeTreeReadPool");
bool stored_on_remote_disk = false;
};
using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;