From 84490d519899541b3755da844fbd04939882a370 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 28 Apr 2022 18:32:59 +0200 Subject: [PATCH 1/2] Fix --- ...chronousReadIndirectBufferFromRemoteFS.cpp | 10 +++- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 2 +- .../MergeTree/MergeTreeReaderStream.cpp | 56 ++++--------------- 3 files changed, 21 insertions(+), 47 deletions(-) diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 60620dd4159..cbfa118e2a6 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -138,7 +138,10 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position) { if (prefetch_future.valid()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition"); + { + prefetch_future.wait(); + prefetch_future = {}; + } if (position > read_until_position) { @@ -151,7 +154,10 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd() { if (prefetch_future.valid()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilEnd"); + { + prefetch_future.wait(); + prefetch_future = {}; + } read_until_position = impl->getFileSize(); impl->setReadUntilPosition(*read_until_position); diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 56720c2af2b..1df521d021a 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -277,7 +277,7 @@ String ReadBufferFromRemoteFSGather::getInfoForLog() size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const { if (!current_buf) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer not initialized"); + return file_offset_of_buffer_end; return current_buf->getFileOffsetOfBufferEnd(); } diff --git a/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/src/Storages/MergeTree/MergeTreeReaderStream.cpp index b337bd62dd3..7e3eac4eb84 100644 --- a/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -126,55 +126,23 @@ size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included) auto right_mark = marks_loader.getMark(right_mark_non_included); result_right_offset = right_mark.offset_in_compressed_file; - bool need_to_check_marks_from_the_right = false; - - /// If the end of range is inside the block, we will need to read it too. - if (right_mark.offset_in_decompressed_block > 0) - { - need_to_check_marks_from_the_right = true; - } - else - { - size_t right_mark_included = right_mark_non_included - 1; - const MarkInCompressedFile & right_mark_included_in_file = marks_loader.getMark(right_mark_included); - - /// Also, in LowCardinality dictionary several consecutive marks can point to - /// the same offset. So to get true bytes offset we have to get first - /// non-equal mark. - /// Example: - /// Mark 186, points to [2003111, 0] - /// Mark 187, points to [2003111, 0] - /// Mark 188, points to [2003111, 0] <--- for example need to read until 188 - /// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset - /// Mark 190, points to [2003111, 0] - /// Mark 191, points to [2003111, 0] - /// Mark 192, points to [2081424, 0] <--- what we are looking for - /// Mark 193, points to [2081424, 0] - /// Mark 194, points to [2081424, 0] - if (right_mark_included_in_file.offset_in_compressed_file == result_right_offset) - need_to_check_marks_from_the_right = true; - } - /// Let's go to the right and find mark with bigger offset in compressed file - if (need_to_check_marks_from_the_right) + bool found_bigger_mark = false; + for (size_t i = right_mark_non_included + 1; i < marks_count; ++i) { - bool found_bigger_mark = false; - for (size_t i = right_mark_non_included + 1; i < marks_count; ++i) + const auto & candidate_mark = marks_loader.getMark(i); + if (result_right_offset < candidate_mark.offset_in_compressed_file) { - const auto & candidate_mark = marks_loader.getMark(i); - if (result_right_offset < candidate_mark.offset_in_compressed_file) - { - result_right_offset = candidate_mark.offset_in_compressed_file; - found_bigger_mark = true; - break; - } + result_right_offset = candidate_mark.offset_in_compressed_file; + found_bigger_mark = true; + break; } + } - if (!found_bigger_mark) - { - /// If there are no marks after the end of range, just use file size - result_right_offset = file_size; - } + if (!found_bigger_mark) + { + /// If there are no marks after the end of range, just use file size + result_right_offset = file_size; } } else if (right_mark_non_included == 0) From c31aae0ef49ea0ecd6823b1009471cb1cc1f4f8f Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 29 Apr 2022 01:17:11 +0200 Subject: [PATCH 2/2] Update test --- tests/queries/1_stateful/00167_read_bytes_from_fs.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/1_stateful/00167_read_bytes_from_fs.sql b/tests/queries/1_stateful/00167_read_bytes_from_fs.sql index ac20e60b177..7b3f50f8141 100644 --- a/tests/queries/1_stateful/00167_read_bytes_from_fs.sql +++ b/tests/queries/1_stateful/00167_read_bytes_from_fs.sql @@ -1,3 +1,5 @@ +-- Tags: no-random-settings + SELECT sum(cityHash64(*)) FROM test.hits SETTINGS max_threads=40; -- We had a bug which lead to additional compressed data read. test.hits compressed size is about 1.2Gb, but we read more then 3Gb.