From 112a7647949d467e4204abaa98b5f123135335b6 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 11 Jul 2022 18:25:28 +0200 Subject: [PATCH 1/3] Process no disk space left :wq# Please enter the commit message for your changes. Lines starting --- src/Disks/IO/CachedReadBufferFromRemoteFS.cpp | 100 ++++++++++++------ src/Disks/IO/CachedReadBufferFromRemoteFS.h | 3 + .../configs/config.d/storage_conf.xml | 17 +++ tests/integration/test_merge_tree_s3/test.py | 31 ++++++ 4 files changed, 117 insertions(+), 34 deletions(-) diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index 46f18d32bf1..5d629a75259 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -567,28 +567,31 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment) ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size); - if (file_segment->reserve(current_predownload_size)) + bool continue_predownload = file_segment->reserve(current_predownload_size); + if (continue_predownload) { LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size); assert(file_segment->getDownloadOffset() == static_cast(implementation_buffer->getPosition())); - Stopwatch watch(CLOCK_MONOTONIC); + bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, *file_segment); + if (success) + { + current_offset += current_predownload_size; - file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset); + bytes_to_predownload -= current_predownload_size; + implementation_buffer->position() += current_predownload_size; + } + else + { + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - watch.stop(); - auto elapsed = watch.elapsedMicroseconds(); - current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, current_predownload_size); - - current_offset += current_predownload_size; - - bytes_to_predownload -= current_predownload_size; - implementation_buffer->position() += current_predownload_size; + continue_predownload = false; + } } - else + + if (!continue_predownload) { /// We were predownloading: /// segment{1} @@ -691,6 +694,34 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded() return true; } +bool CachedReadBufferFromRemoteFS::writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment) +{ + Stopwatch watch(CLOCK_MONOTONIC); + + try + { + file_segment.write(data, size, offset); + } + catch (ErrnoException & e) + { + int code = e.getErrno(); + if (code == /* No space left on device */28 || code == /* Quota exceeded */122) + { + LOG_INFO(log, "Not enough disk space to write cache, will skip cache download. ({})", e.displayText()); + return false; + } + throw; + } + + watch.stop(); + auto elapsed = watch.elapsedMicroseconds(); + current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed); + ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed); + ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size); + + return true; +} + bool CachedReadBufferFromRemoteFS::nextImpl() { try @@ -840,33 +871,34 @@ bool CachedReadBufferFromRemoteFS::nextImplStep() { assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right); - if (file_segment->reserve(size)) + bool success = file_segment->reserve(size); + if (success) { assert(file_segment->getDownloadOffset() == static_cast(implementation_buffer->getPosition())); - Stopwatch watch(CLOCK_MONOTONIC); - - file_segment->write( - needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(), - size, - file_offset_of_buffer_end); - - watch.stop(); - auto elapsed = watch.elapsedMicroseconds(); - current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size); - - assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1); - assert( - std::next(current_file_segment_it) == file_segments_holder->file_segments.end() - || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd()); + success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, *file_segment); + if (success) + { + assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1); + assert( + std::next(current_file_segment_it) == file_segments_holder->file_segments.end() + || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd()); + } + else + { + assert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + } } else { - download_current_segment = false; - file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); LOG_DEBUG(log, "No space left in cache, will continue without cache download"); + file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + } + + if (!success) + { + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + download_current_segment = false; } } diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.h b/src/Disks/IO/CachedReadBufferFromRemoteFS.h index 867b8538260..aff29dd200c 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.h @@ -73,10 +73,13 @@ private: SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_); size_t getTotalSizeToRead(); + bool completeFileSegmentAndGetNext(); void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type); + bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment); + Poco::Logger * log; IFileCache::Key cache_key; String remote_fs_object_path; diff --git a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml index 2f1b8275a0b..765ccb9ac7c 100644 --- a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml @@ -27,6 +27,16 @@ 33554432 1 + + s3 + http://minio1:9001/root/data/ + minio + minio123 + 33554432 + 1 + /jbod1/ + 1000000000 + @@ -53,6 +63,13 @@ + + +
+ s3_with_cache_and_jbod +
+
+
diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index f5a9bf153b7..64d70322a9c 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -26,6 +26,18 @@ def cluster(): ], with_minio=True, ) + + cluster.add_instance( + "node_with_limited_disk", + main_configs=[ + "configs/config.d/storage_conf.xml", + "configs/config.d/bg_processing_pool_conf.xml", + ], + with_minio=True, + tmpfs=[ + "/jbod1:size=2M", + ], + ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") @@ -666,3 +678,22 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name): minio = cluster.minio_client for obj in list(minio.list_objects(cluster.minio_bucket, "data/")): minio.remove_object(cluster.minio_bucket, obj.object_name) + + +@pytest.mark.parametrize("node_name", ["node_with_limited_disk"]) +def test_cache_with_full_disk_space(cluster, node_name): + node = cluster.instances[node_name] + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + node.query( + "CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';" + ) + node.query( + "INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000" + ) + node.query( + "SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null" + ) + assert node.contains_in_log( + "Not enough disk space to write cache, will skip cache download" + ) + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") From d6c145e5e1aecc60dadf7e792af6d668dc88da0a Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 13 Jul 2022 11:48:17 +0200 Subject: [PATCH 2/3] Update CachedReadBufferFromRemoteFS.cpp --- src/Disks/IO/CachedReadBufferFromRemoteFS.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index 5d629a75259..b929cea0236 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -707,7 +707,7 @@ bool CachedReadBufferFromRemoteFS::writeCache(char * data, size_t size, size_t o int code = e.getErrno(); if (code == /* No space left on device */28 || code == /* Quota exceeded */122) { - LOG_INFO(log, "Not enough disk space to write cache, will skip cache download. ({})", e.displayText()); + LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText()); return false; } throw; From 48cb1ed9094a7ff2e2cba5be57197e07e1707fb8 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 13 Jul 2022 11:48:39 +0200 Subject: [PATCH 3/3] Update test.py --- tests/integration/test_merge_tree_s3/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 64d70322a9c..5c3885851fb 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -694,6 +694,6 @@ def test_cache_with_full_disk_space(cluster, node_name): "SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null" ) assert node.contains_in_log( - "Not enough disk space to write cache, will skip cache download" + "Insert into cache is skipped due to insufficient disk space" ) node.query("DROP TABLE IF EXISTS s3_test NO DELAY")