mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #39106 from kssenii/cache-cacth-disk-full-on-cache-write
fs cache: continue without exception when getting no disk space left error
This commit is contained in:
commit
61ce5161f3
@ -567,28 +567,31 @@ void CachedReadBufferFromRemoteFS::predownload(FileSegmentPtr & file_segment)
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
|
||||
|
||||
if (file_segment->reserve(current_predownload_size))
|
||||
bool continue_predownload = file_segment->reserve(current_predownload_size);
|
||||
if (continue_predownload)
|
||||
{
|
||||
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
|
||||
|
||||
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, *file_segment);
|
||||
if (success)
|
||||
{
|
||||
current_offset += current_predownload_size;
|
||||
|
||||
file_segment->write(implementation_buffer->buffer().begin(), current_predownload_size, current_offset);
|
||||
bytes_to_predownload -= current_predownload_size;
|
||||
implementation_buffer->position() += current_predownload_size;
|
||||
}
|
||||
else
|
||||
{
|
||||
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
|
||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
|
||||
watch.stop();
|
||||
auto elapsed = watch.elapsedMicroseconds();
|
||||
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, current_predownload_size);
|
||||
|
||||
current_offset += current_predownload_size;
|
||||
|
||||
bytes_to_predownload -= current_predownload_size;
|
||||
implementation_buffer->position() += current_predownload_size;
|
||||
continue_predownload = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
if (!continue_predownload)
|
||||
{
|
||||
/// We were predownloading:
|
||||
/// segment{1}
|
||||
@ -691,6 +694,34 @@ bool CachedReadBufferFromRemoteFS::updateImplementationBufferIfNeeded()
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CachedReadBufferFromRemoteFS::writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment)
|
||||
{
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
||||
try
|
||||
{
|
||||
file_segment.write(data, size, offset);
|
||||
}
|
||||
catch (ErrnoException & e)
|
||||
{
|
||||
int code = e.getErrno();
|
||||
if (code == /* No space left on device */28 || code == /* Quota exceeded */122)
|
||||
{
|
||||
LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText());
|
||||
return false;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
watch.stop();
|
||||
auto elapsed = watch.elapsedMicroseconds();
|
||||
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CachedReadBufferFromRemoteFS::nextImpl()
|
||||
{
|
||||
try
|
||||
@ -840,33 +871,34 @@ bool CachedReadBufferFromRemoteFS::nextImplStep()
|
||||
{
|
||||
assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right);
|
||||
|
||||
if (file_segment->reserve(size))
|
||||
bool success = file_segment->reserve(size);
|
||||
if (success)
|
||||
{
|
||||
assert(file_segment->getDownloadOffset() == static_cast<size_t>(implementation_buffer->getPosition()));
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
||||
file_segment->write(
|
||||
needed_to_predownload ? implementation_buffer->position() : implementation_buffer->buffer().begin(),
|
||||
size,
|
||||
file_offset_of_buffer_end);
|
||||
|
||||
watch.stop();
|
||||
auto elapsed = watch.elapsedMicroseconds();
|
||||
current_file_segment_counters.increment(ProfileEvents::FileSegmentCacheWriteMicroseconds, elapsed);
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteMicroseconds, elapsed);
|
||||
ProfileEvents::increment(ProfileEvents::CachedReadBufferCacheWriteBytes, size);
|
||||
|
||||
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
|
||||
assert(
|
||||
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|
||||
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
|
||||
success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, *file_segment);
|
||||
if (success)
|
||||
{
|
||||
assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1);
|
||||
assert(
|
||||
std::next(current_file_segment_it) == file_segments_holder->file_segments.end()
|
||||
|| file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd());
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
download_current_segment = false;
|
||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
|
||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
}
|
||||
|
||||
if (!success)
|
||||
{
|
||||
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
|
||||
download_current_segment = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,10 +73,13 @@ private:
|
||||
SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_);
|
||||
|
||||
size_t getTotalSizeToRead();
|
||||
|
||||
bool completeFileSegmentAndGetNext();
|
||||
|
||||
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
|
||||
|
||||
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);
|
||||
|
||||
Poco::Logger * log;
|
||||
IFileCache::Key cache_key;
|
||||
String remote_fs_object_path;
|
||||
|
@ -34,6 +34,16 @@
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
</s3_with_cache>
|
||||
<s3_with_cache_and_jbod>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
<data_cache_path>/jbod1/</data_cache_path>
|
||||
<data_cache_max_size>1000000000</data_cache_max_size>
|
||||
</s3_with_cache_and_jbod>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3>
|
||||
@ -67,6 +77,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_cache>
|
||||
<s3_with_cache_and_jbod>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3_with_cache_and_jbod</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_with_cache_and_jbod>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
|
@ -26,6 +26,18 @@ def cluster():
|
||||
],
|
||||
with_minio=True,
|
||||
)
|
||||
|
||||
cluster.add_instance(
|
||||
"node_with_limited_disk",
|
||||
main_configs=[
|
||||
"configs/config.d/storage_conf.xml",
|
||||
"configs/config.d/bg_processing_pool_conf.xml",
|
||||
],
|
||||
with_minio=True,
|
||||
tmpfs=[
|
||||
"/jbod1:size=2M",
|
||||
],
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
@ -678,3 +690,22 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
|
||||
minio = cluster.minio_client
|
||||
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
|
||||
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_name", ["node_with_limited_disk"])
|
||||
def test_cache_with_full_disk_space(cluster, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
||||
node.query(
|
||||
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';"
|
||||
)
|
||||
node.query(
|
||||
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000"
|
||||
)
|
||||
node.query(
|
||||
"SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null"
|
||||
)
|
||||
assert node.contains_in_log(
|
||||
"Insert into cache is skipped due to insufficient disk space"
|
||||
)
|
||||
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
||||
|
Loading…
Reference in New Issue
Block a user