Process no disk space left

:wq# Please enter the commit message for your changes. Lines starting
This commit is contained in:
kssenii 2022-07-11 18:25:28 +02:00
parent ae46bf985e
commit 112a764794
4 changed files with 117 additions and 34 deletions

View File

@ -567,28 +567,31 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
if (file_segment->reserve(current_predownload_size))
bool continue_predownload = file_segment->reserve(current_predownload_size);
if (continue_predownload)
{
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
Stopwatch watch(CLOCK_MONOTONIC);
bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, *file_segment);
if (success)
{
current_offset += current_predownload_size;
file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset);
bytes_to_predownload -= current_predownload_size;
implementation_buffer->position() += current_predownload_size;
}
else
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, current_predownload_size);
current_offset += current_predownload_size;
bytes_to_predownload -= current_predownload_size;
implementation_buffer->position() += current_predownload_size;
continue_predownload = false;
}
}
else
if (!continue_predownload)
{
/// We were predownloading:
/// segment{1}
@ -691,6 +694,34 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded()
return true;
}
bool CachedReadBufferFromRemoteFS::writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment)
{
Stopwatch watch(CLOCK_MONOTONIC);
try
{
file_segment.write(data, size, offset);
}
catch (ErrnoException & e)
{
int code = e.getErrno();
if (code == /* No space left on device */28 || code == /* Quota exceeded */122)
{
LOG_INFO(log, "Not enough disk space to write cache, will skip cache download. ({})", e.displayText());
return false;
}
throw;
}
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
return true;
}
bool CachedReadBufferFromRemoteFS::nextImpl()
{
try
@ -840,33 +871,34 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
{
assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right);
if (file_segment->reserve(size))
bool success = file_segment->reserve(size);
if (success)
{
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
Stopwatch watch(CLOCK_MONOTONIC);
file_segment->write(
needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(),
size,
file_offset_of_buffer_end);
watch.stop();
auto elapsed = watch.elapsedMicroseconds();
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, *file_segment);
if (success)
{
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
assert(
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
}
else
{
assert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
}
else
{
download_current_segment = false;
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
if (!success)
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
download_current_segment = false;
}
}

View File

@ -73,10 +73,13 @@ private:
SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_);
size_t getTotalSizeToRead();
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);
Poco::Logger * log;
IFileCache::Key cache_key;
String remote_fs_object_path;

View File

@ -27,6 +27,16 @@
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<data_cache_enabled>1</data_cache_enabled>
</s3_with_cache>
<s3_with_cache_and_jbod>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<data_cache_enabled>1</data_cache_enabled>
<data_cache_path>/jbod1/</data_cache_path>
<data_cache_max_size>1000000000</data_cache_max_size>
</s3_with_cache_and_jbod>
</disks>
<policies>
<s3>
@ -53,6 +63,13 @@
</main>
</volumes>
</s3_cache>
<s3_with_cache_and_jbod>
<volumes>
<main>
<disk>s3_with_cache_and_jbod</disk>
</main>
</volumes>
</s3_with_cache_and_jbod>
</policies>
</storage_configuration>

View File

@ -26,6 +26,18 @@ def cluster():
],
with_minio=True,
)
cluster.add_instance(
"node_with_limited_disk",
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
],
with_minio=True,
tmpfs=[
"/jbod1:size=2M",
],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -666,3 +678,22 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@pytest.mark.parametrize("node_name", ["node_with_limited_disk"])
def test_cache_with_full_disk_space(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';"
)
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000"
)
node.query(
"SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null"
)
assert node.contains_in_log(
"Not enough disk space to write cache, will skip cache download"
)
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")