Merge pull request #41330 from kssenii/fix-composable-cache-setting-incompatibility

Fix incompatibility of cache after switching setting `do_no_evict_...` from 1 to 0, 0 to 1
This commit is contained in:
Kseniia Sumarokova 2022-09-15 12:00:22 +02:00 committed by GitHub
commit 69f38a6228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 13 deletions

View File

@ -143,9 +143,11 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
}
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(size_t offset) const
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segment) const
{
auto path = cache->getPathInLocalCache(cache_key, offset, is_persistent);
/// Use is_persistent flag from in-memory state of the filesegment,
/// because it is consistent with what is written on disk.
auto path = file_segment.getPathInLocalCache();
ReadSettings local_read_settings{settings};
/// Do not allow to use asynchronous version of LocalFSReadMethod.
@ -237,8 +239,6 @@ bool CachedOnDiskReadBufferFromFile::canStartFromCache(size_t current_offset, co
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
{
auto range = file_segment->range();
auto download_state = file_segment->state();
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
@ -247,7 +247,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
if (download_state == FileSegment::State::DOWNLOADED)
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
else
{
@ -280,7 +280,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
download_state = file_segment->wait();
@ -289,7 +289,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
case FileSegment::State::DOWNLOADED:
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
case FileSegment::State::EMPTY:
case FileSegment::State::PARTIALLY_DOWNLOADED:
@ -305,7 +305,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
auto downloader_id = file_segment->getOrSetDownloader();
@ -323,7 +323,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
read_type = ReadType::CACHED;
file_segment->resetDownloader();
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
if (file_segment->getCurrentWriteOffset() < file_offset_of_buffer_end)
@ -339,7 +339,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
LOG_TEST(log, "Predownload. File segment info: {}", file_segment->getInfoForLog());
chassert(file_offset_of_buffer_end > file_segment->getCurrentWriteOffset());
bytes_to_predownload = file_offset_of_buffer_end - file_segment->getCurrentWriteOffset();
chassert(bytes_to_predownload < range.size());
chassert(bytes_to_predownload < file_segment->range().size());
}
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
@ -354,7 +354,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
if (canStartFromCache(file_offset_of_buffer_end, *file_segment))
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
else
{

View File

@ -68,7 +68,7 @@ private:
ImplementationBufferPtr getReadBufferForFileSegment(FileSegmentPtr & file_segment);
ImplementationBufferPtr getCacheReadBuffer(size_t offset) const;
ImplementationBufferPtr getCacheReadBuffer(const FileSegment & file_segment) const;
std::optional<size_t> getLastNonDownloadedOffset() const;

View File

@ -38,6 +38,20 @@
<path>/jbod1/</path>
<max_size>1000000000</max_size>
</s3_with_cache_and_jbod>
<s3_r>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3_r>
<s3_cache_r>
<type>cache</type>
<disk>s3_r</disk>
<path>/s3_cache_r/</path>
<max_size>1000000000</max_size>
<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>
</s3_cache_r>
</disks>
<policies>
<s3>
@ -78,6 +92,13 @@
</main>
</volumes>
</s3_with_cache_and_jbod>
<s3_cache_r>
<volumes>
<main>
<disk>s3_cache_r</disk>
</main>
</volumes>
</s3_cache_r>
</policies>
</storage_configuration>

View File

@ -6,7 +6,6 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -36,6 +35,7 @@ def cluster():
"/jbod1:size=2M",
],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -742,3 +742,79 @@ def test_store_cleanup_disk_s3(cluster, node_name):
"CREATE TABLE s3_test UUID '00000000-1000-4000-8000-000000000001' (n UInt64) Engine=MergeTree() ORDER BY n SETTINGS storage_policy='s3';"
)
node.query("INSERT INTO s3_test SELECT 1")
@pytest.mark.parametrize("node_name", ["node"])
def test_cache_setting_compatibility(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_r';"
)
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500"
)
result = node.query("SYSTEM DROP FILESYSTEM CACHE")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) == 0
node.query("SELECT * FROM s3_test")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
config_path = os.path.join(
SCRIPT_DIR,
f"./{cluster.instances_dir_name}/node/configs/config.d/storage_conf.xml",
)
replace_config(
config_path,
"<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>",
"<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>",
)
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("1")
node.restart_clickhouse()
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("0")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")
replace_config(
config_path,
"<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>",
"<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>",
)
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
node.restart_clickhouse()
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("1")
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")