Fix checks

This commit is contained in:
kssenii 2023-03-27 16:54:40 +02:00
parent 5a4525742d
commit 314ee12442
3 changed files with 63 additions and 26 deletions

View File

@ -1521,21 +1521,27 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const auto & disk_ptr : disks)
{
defined_disk_names.insert(disk_ptr->getName());
}
/// In case of delegate disks it is not enough to traverse `disks`,
/// because for example cache or encrypted disk which wrap s3 disk and s3 disk itself can be put into different storage policies.
/// But disk->exists returns the same thing for both disks.
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
/// As encrypted disk can use the same path of its nested disk,
/// we need to take it into account here.
const auto & delegate = disk_ptr->getDelegateDiskIfExists();
if (delegate && disk_ptr->getPath() == delegate->getPath())
const auto & delegate = disk->getDelegateDiskIfExists();
if (delegate && disk->getPath() == delegate->getPath())
defined_disk_names.insert(delegate->getName());
/// As cache is implemented on object storage layer, not on disk level, e.g.
/// we have such structure:
/// DiskObjectStorage(CachedObjectStorage(...(CachedObjectStored(ObjectStorage)...)))
/// and disk_ptr->getName() here is the name of last delegate - ObjectStorage.
/// So now we need to add cache layers to defined disk names.
if (disk_ptr->supportsCache())
if (disk->supportsCache())
{
auto caches = disk_ptr->getCacheLayersNames();
/// As cache is implemented on object storage layer, not on disk level, e.g.
/// we have such structure:
/// DiskObjectStorage(CachedObjectStorage(...(CachedObjectStored(ObjectStorage)...)))
/// and disk_ptr->getName() here is the name of last delegate - ObjectStorage.
/// So now we need to add cache layers to defined disk names.
auto caches = disk->getCacheLayersNames();
defined_disk_names.insert(caches.begin(), caches.end());
}
}

View File

@ -15,7 +15,23 @@
<type>encrypted</type>
<disk>disk_s3</disk>
<key>1234567812345678</key>
<path>encrypted/</path>
</disk_s3_encrypted>
<disk_s3_encrypted_default_path>
<type>encrypted</type>
<disk>disk_s3</disk>
<key>1234567812345678</key>
</disk_s3_encrypted_default_path>
<s3_cache>
<disk>disk_s3</disk>
<path>s3_cache/</path>
<max_size>1Gi</max_size>
</s3_cache>
<encrypted_s3_cache>
<type>encrypted</type>
<disk>s3_cache</disk>
<key>1234567812345678</key>
</encrypted_s3_cache>
<disk_local_encrypted>
<type>encrypted</type>
<disk>disk_local</disk>
@ -73,6 +89,20 @@
</external>
</volumes>
</s3_policy>
<s3_encrypted_default_path>
<volumes>
<main>
<disk>disk_s3_encrypted_default_path</disk>
</main>
</volumes>
</s3_encrypted_default_path>_
<s3_encrypted_cache_policy>
<volumes>
<main>
<disk>encrypted_s3_cache</disk>
</main>
</volumes>
</s3_encrypted_cache_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -273,24 +273,25 @@ def test_read_in_order():
def test_restart():
node.query(
"""
DROP TABLE IF EXISTS encrypted_test;
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS disk='disk_s3_encrypted'
"""
)
for policy in ["disk_s3_encrypted_default_path", "encrypted_s3_cache"]:
node.query(
f"""
DROP TABLE IF EXISTS encrypted_test;
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS disk='{policy}'
"""
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.restart_clickhouse()
node.restart_clickhouse()
assert node.query(select_query) == "(0,'data'),(1,'data')"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.query("DROP TABLE IF EXISTS encrypted_test NO DELAY;")
node.query("DROP TABLE encrypted_test NO DELAY;")