diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index 46f18d32bf1..5d629a75259 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -567,28 +567,31 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment) ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size); - if (file_segment->reserve(current_predownload_size)) + bool continue_predownload = file_segment->reserve(current_predownload_size); + if (continue_predownload) { LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size); assert(file_segment->getDownloadOffset() == static_cast(implementation_buffer->getPosition())); - Stopwatch watch(CLOCK_MONOTONIC); + bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, *file_segment); + if (success) + { + current_offset += current_predownload_size; - file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset); + bytes_to_predownload -= current_predownload_size; + implementation_buffer->position() += current_predownload_size; + } + else + { + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - watch.stop(); - auto elapsed = watch.elapsedMicroseconds(); - current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, current_predownload_size); - - current_offset += current_predownload_size; - - bytes_to_predownload -= current_predownload_size; - implementation_buffer->position() += current_predownload_size; + continue_predownload = false; + } } - else + + if (!continue_predownload) { /// We were predownloading: /// segment{1} @@ -691,6 +694,34 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded() return true; } +bool CachedReadBufferFromRemoteFS::writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment) +{ + Stopwatch watch(CLOCK_MONOTONIC); + + try + { + file_segment.write(data, size, offset); + } + catch (ErrnoException & e) + { + int code = e.getErrno(); + if (code == /* No space left on device */28 || code == /* Quota exceeded */122) + { + LOG_INFO(log, "Not enough disk space to write cache, will skip cache download. ({})", e.displayText()); + return false; + } + throw; + } + + watch.stop(); + auto elapsed = watch.elapsedMicroseconds(); + current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed); + ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed); + ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size); + + return true; +} + bool CachedReadBufferFromRemoteFS::nextImpl() { try @@ -840,33 +871,34 @@ bool CachedReadBufferFromRemoteFS::nextImplStep() { assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right); - if (file_segment->reserve(size)) + bool success = file_segment->reserve(size); + if (success) { assert(file_segment->getDownloadOffset() == static_cast(implementation_buffer->getPosition())); - Stopwatch watch(CLOCK_MONOTONIC); - - file_segment->write( - needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(), - size, - file_offset_of_buffer_end); - - watch.stop(); - auto elapsed = watch.elapsedMicroseconds(); - current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed); - ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size); - - assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1); - assert( - std::next(current_file_segment_it) == file_segments_holder->file_segments.end() - || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd()); + success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, *file_segment); + if (success) + { + assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1); + assert( + std::next(current_file_segment_it) == file_segments_holder->file_segments.end() + || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd()); + } + else + { + assert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + } } else { - download_current_segment = false; - file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); LOG_DEBUG(log, "No space left in cache, will continue without cache download"); + file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + } + + if (!success) + { + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + download_current_segment = false; } } diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.h b/src/Disks/IO/CachedReadBufferFromRemoteFS.h index 867b8538260..aff29dd200c 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.h @@ -73,10 +73,13 @@ private: SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_); size_t getTotalSizeToRead(); + bool completeFileSegmentAndGetNext(); void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type); + bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment); + Poco::Logger * log; IFileCache::Key cache_key; String remote_fs_object_path; diff --git a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml index 2f1b8275a0b..765ccb9ac7c 100644 --- a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml +++ b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml @@ -27,6 +27,16 @@ 33554432 1 + + s3 + http://minio1:9001/root/data/ + minio + minio123 + 33554432 + 1 + /jbod1/ + 1000000000 + @@ -53,6 +63,13 @@ + + +
+ s3_with_cache_and_jbod +
+
+
diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index f5a9bf153b7..64d70322a9c 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -26,6 +26,18 @@ def cluster(): ], with_minio=True, ) + + cluster.add_instance( + "node_with_limited_disk", + main_configs=[ + "configs/config.d/storage_conf.xml", + "configs/config.d/bg_processing_pool_conf.xml", + ], + with_minio=True, + tmpfs=[ + "/jbod1:size=2M", + ], + ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") @@ -666,3 +678,22 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name): minio = cluster.minio_client for obj in list(minio.list_objects(cluster.minio_bucket, "data/")): minio.remove_object(cluster.minio_bucket, obj.object_name) + + +@pytest.mark.parametrize("node_name", ["node_with_limited_disk"]) +def test_cache_with_full_disk_space(cluster, node_name): + node = cluster.instances[node_name] + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + node.query( + "CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';" + ) + node.query( + "INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000" + ) + node.query( + "SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null" + ) + assert node.contains_in_log( + "Not enough disk space to write cache, will skip cache download" + ) + node.query("DROP TABLE IF EXISTS s3_test NO DELAY")