From edfb1a5521d83dd9722d2d705dba5284b6534270 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 15 Oct 2021 15:07:39 +0300 Subject: [PATCH] Better performance for smaller number of threads --- src/Common/ProfileEvents.cpp | 2 +- ...chronousReadIndirectBufferFromRemoteFS.cpp | 4 +-- src/IO/ReadBufferFromS3.cpp | 10 +++---- src/IO/ReadBufferFromS3.h | 2 +- src/IO/ReadBufferFromWebServer.cpp | 26 +++++++++++++++---- src/IO/ReadBufferFromWebServer.h | 2 ++ src/IO/ReadSettings.h | 2 -- src/IO/ReadWriteBufferFromHTTP.h | 10 +++---- src/Storages/MergeTree/MergeTreeReadPool.cpp | 15 ++++++++--- src/Storages/MergeTree/MergeTreeReadPool.h | 2 ++ 10 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index f9645875a6c..24dcbacebf9 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -255,7 +255,7 @@ M(RemoteFSSeeks, "Total number of seeks for async buffer") \ M(RemoteFSPrefetches, "Total number of prefetches") \ M(RemoteFSSeekCancelledPrefetches, "Number of cancelled prefecthes because of seek") \ - M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches prending in buffer desctructor") \ + M(RemoteFSUnusedCancelledPrefetches, "Number of prefetches pending in buffer destructor") \ M(RemoteFSPrefetchReads, "Total number of reads from prefecthed buffer") \ M(RemoteFSAsyncBufferReads, "Number of nextImpl() calls for async buffer") \ M(RemoteFSSimpleBufferReads, "Number of nextImpl() calls for non-async buffer") \ diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 907b75f0417..0325d7f79b7 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -27,7 +27,7 @@ namespace DB namespace ErrorCodes { - extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int LOGICAL_ERROR; } @@ -160,7 +160,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() prefetch_future = {}; - /// TODO: it does not really seem to improve anything to call prefecth() here, + /// TODO: it does not really seem to improve anything to call prefetch() here, /// but it does not make any worse at the same time. /// Need to test, it might be useful because in fact sometimes (minority of cases though) /// we can read without prefetching several times in a row. diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index f01fd9a87a1..bf578373535 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -51,11 +51,11 @@ bool ReadBufferFromS3::nextImpl() { if (last_offset) { - if (static_cast(last_offset) == offset) + if (last_offset == offset) return false; - if (static_cast(last_offset) < offset) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1); + if (last_offset < offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1); } bool next_result = false; @@ -173,8 +173,8 @@ std::unique_ptr ReadBufferFromS3::initialize() if (last_offset) { - if (offset >= static_cast(last_offset)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1); + if (offset >= last_offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1); req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1)); LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1); diff --git a/src/IO/ReadBufferFromS3.h b/src/IO/ReadBufferFromS3.h index 336893ca5b0..fae3938bf72 100644 --- a/src/IO/ReadBufferFromS3.h +++ b/src/IO/ReadBufferFromS3.h @@ -59,7 +59,7 @@ private: ReadSettings read_settings; bool use_external_buffer; - size_t last_offset; + off_t last_offset; }; } diff --git a/src/IO/ReadBufferFromWebServer.cpp b/src/IO/ReadBufferFromWebServer.cpp index 41b4c80b2c8..245364a9896 100644 --- a/src/IO/ReadBufferFromWebServer.cpp +++ b/src/IO/ReadBufferFromWebServer.cpp @@ -23,7 +23,11 @@ namespace ErrorCodes static constexpr size_t HTTP_MAX_TRIES = 10; ReadBufferFromWebServer::ReadBufferFromWebServer( - const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_, size_t) + const String & url_, + ContextPtr context_, + const ReadSettings & settings_, + bool use_external_buffer_, + size_t last_offset_) : SeekableReadBuffer(nullptr, 0) , log(&Poco::Logger::get("ReadBufferFromWebServer")) , context(context_) @@ -31,6 +35,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer( , buf_size(settings_.remote_fs_buffer_size) , read_settings(settings_) , use_external_buffer(use_external_buffer_) + , last_offset(last_offset_) { } @@ -41,11 +46,13 @@ std::unique_ptr ReadBufferFromWebServer::initialize() ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; - auto right_offset = read_settings.remote_read_right_offset; - if (right_offset) + if (last_offset) { - headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-{}", offset, right_offset))); - LOG_DEBUG(log, "Reading with range: {}-{}", offset, right_offset); + if (last_offset < offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1); + + headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-{}", offset, last_offset - 1))); + LOG_DEBUG(log, "Reading with range: {}-{}", offset, last_offset); } else { @@ -120,6 +127,15 @@ void ReadBufferFromWebServer::initializeWithRetry() bool ReadBufferFromWebServer::nextImpl() { + if (last_offset) + { + if (last_offset == offset) + return false; + + if (last_offset < offset) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyound right offset ({} > {})", offset, last_offset - 1); + } + if (impl) { if (use_external_buffer) diff --git a/src/IO/ReadBufferFromWebServer.h b/src/IO/ReadBufferFromWebServer.h index c4d847b9f39..1ffb8589392 100644 --- a/src/IO/ReadBufferFromWebServer.h +++ b/src/IO/ReadBufferFromWebServer.h @@ -47,6 +47,8 @@ private: ReadSettings read_settings; bool use_external_buffer; + + off_t last_offset = 0; }; } diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index 0548659e16e..edd5463bd7c 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -77,8 +77,6 @@ struct ReadSettings size_t remote_fs_read_max_backoff_ms = 10000; size_t remote_fs_read_backoff_max_tries = 4; - size_t remote_read_right_offset = 0; /// Right offset for range reading. - bool http_retriable_read = true; size_t http_max_tries = 1; size_t http_retry_initial_backoff_ms = 100; diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index f28fcb60497..ca44fc2583d 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -265,6 +265,9 @@ namespace detail if (next_callback) next_callback(count()); + if (total_bytes_to_read && bytes_read == total_bytes_to_read.value()) + return false; + if (impl) { if (use_external_buffer) @@ -284,7 +287,7 @@ namespace detail { /** * impl was initialized before, pass position() to it to make - * sure there is no pending data which was not read, becuase + * sure there is no pending data which was not read, because * this branch means we read sequentially. */ if (!working_buffer.empty()) @@ -292,9 +295,6 @@ namespace detail } } - if (total_bytes_to_read && bytes_read == total_bytes_to_read.value()) - return false; - if (impl && !working_buffer.empty()) impl->position() = position(); @@ -322,7 +322,7 @@ namespace detail || (bytes_read && !settings.http_retriable_read)) throw; - LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), e.what()); + LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code()); impl.reset(); sleepForMilliseconds(milliseconds_to_wait); diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index f23c58bc2b3..f9157317d2d 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -42,6 +42,12 @@ MergeTreeReadPool::MergeTreeReadPool( { /// parts don't contain duplicate MergeTreeDataPart's. const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_); + auto min_marks_for_concurrent_read = min_marks_for_concurrent_read_; + if (stored_on_remote_disk) + { + do_not_steal_tasks = true; + min_marks_for_concurrent_read = std::max(min_marks_for_concurrent_read, sum_marks_ / threads_); + } fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_); } @@ -89,8 +95,11 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, auto & part = parts_with_idx[part_idx]; auto & marks_in_part = thread_tasks.sum_marks_in_parts.back(); - /// Get whole part to read if it is small enough. - auto need_marks = std::min(marks_in_part, min_marks_to_read); + size_t need_marks; + if (stored_on_remote_disk) /// For better performance with remote disks + need_marks = marks_in_part; + else /// Get whole part to read if it is small enough. + need_marks = std::min(marks_in_part, min_marks_to_read); /// Do not leave too little rows in part for next time. if (marks_in_part > need_marks && @@ -223,7 +232,7 @@ std::vector MergeTreeReadPool::fillPerPartInfo( /// Turn off tasks stealing in case there is remote disk. if (part.data_part->isStoredOnRemoteDisk()) - do_not_steal_tasks = true; + stored_on_remote_disk = true; /// Read marks for every data part. size_t sum_marks = 0; diff --git a/src/Storages/MergeTree/MergeTreeReadPool.h b/src/Storages/MergeTree/MergeTreeReadPool.h index 9949bdf86f8..db04208aa7a 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.h +++ b/src/Storages/MergeTree/MergeTreeReadPool.h @@ -139,6 +139,8 @@ private: mutable std::mutex mutex; Poco::Logger * log = &Poco::Logger::get("MergeTreeReadPool"); + + bool stored_on_remote_disk = false; }; using MergeTreeReadPoolPtr = std::shared_ptr;