Better delegate disks handle mutual path

This commit is contained in:
kssenii 2023-03-24 16:15:52 +01:00
parent 52541e5e23
commit 16ebfcc3f8
5 changed files with 45 additions and 18 deletions

View File

@ -276,6 +276,10 @@ public:
return delegate->getMetadataStorage();
}
DiskPtr getDelegateDiskIfExists() const override
{
return delegate;
}
private:
String wrappedPath(const String & path) const

View File

@ -423,6 +423,8 @@ public:
void markDiskAsCustom() { is_custom_disk = true; }
virtual DiskPtr getDelegateDiskIfExists() const { return nullptr; }
protected:
friend class DiskDecorator;

View File

@ -1517,24 +1517,26 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
std::unordered_set<String> defined_disk_names;
/// If disk is wrapped into cached disk, it will not be defined in storage policy.
std::unordered_set<String> disk_names_wrapped_in_cache;
for (const auto & disk_ptr : disks)
{
defined_disk_names.insert(disk_ptr->getName());
for (const auto & [disk_name, disk_ptr] : getContext()->getDisksMap())
{
/// In composable cache with the underlying source disk there might the following structure:
/// As encrypted disk can use the same path of its nested disk,
/// we need to take it into account here.
const auto & delegate = disk_ptr->getDelegateDiskIfExists();
if (delegate && disk_ptr->getPath() == delegate->getPath())
defined_disk_names.insert(delegate->getName());
/// As cache is implemented on object storage layer, not on disk level, e.g.
/// we have such structure:
/// DiskObjectStorage(CachedObjectStorage(...(CachedObjectStored(ObjectStorage)...)))
/// In configuration file each of these layers has a different name, but data path
/// (getPath() result) is the same. We need to take it into account here.
if (disk_ptr->supportsCache() && defined_disk_names.contains(disk_ptr->getName()))
/// and disk_ptr->getName() here is the name of last delegate - ObjectStorage.
/// So now we need to add cache layers to defined disk names.
if (disk_ptr->supportsCache())
{
auto caches = disk_ptr->getCacheLayersNames();
disk_names_wrapped_in_cache.insert(caches.begin(), caches.end());
LOG_TEST(log, "Cache layers for cache disk `{}`, inner disk `{}`: {}",
disk_name, disk_ptr->getName(), fmt::join(caches, ", "));
defined_disk_names.insert(caches.begin(), caches.end());
}
}
@ -1543,9 +1545,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (disk->isBroken() || disk->isCustomDisk())
continue;
if (!defined_disk_names.contains(disk_name)
&& disk->exists(relative_data_path)
&& !disk_names_wrapped_in_cache.contains(disk_name))
if (!defined_disk_names.contains(disk_name) && disk->exists(relative_data_path))
{
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
@ -1553,9 +1553,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
throw Exception(
ErrorCodes::UNKNOWN_DISK,
"Part {} ({}) was found on disk {} which is not defined in the storage policy (defined disks: {}, wrapped disks: {})",
backQuote(it->name()), backQuote(it->path()), backQuote(disk_name),
fmt::join(defined_disk_names, ", "), fmt::join(disk_names_wrapped_in_cache, ", "));
"Part {} ({}) was found on disk {} which is not defined in the storage policy (defined disks: {})",
backQuote(it->name()), backQuote(it->path()), backQuote(disk_name), fmt::join(defined_disk_names, ", "));
}
}
}

View File

@ -14,7 +14,6 @@
<disk_s3_encrypted>
<type>encrypted</type>
<disk>disk_s3</disk>
<path>encrypted/</path>
<key>1234567812345678</key>
</disk_s3_encrypted>
<disk_local_encrypted>

View File

@ -12,6 +12,7 @@ node = cluster.add_instance(
main_configs=["configs/storage.xml"],
tmpfs=["/disk:size=100M"],
with_minio=True,
stay_alive=True,
)
@ -269,3 +270,25 @@ def test_read_in_order():
node.query(
"SELECT * FROM encrypted_test ORDER BY a, b SETTINGS optimize_read_in_order=0 FORMAT Null"
)
def test_restart():
node.query(
"""
DROP TABLE IF EXISTS encrypted_test;
CREATE TABLE encrypted_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS disk='disk_s3_encrypted'
"""
)
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
assert node.query(select_query) == "(0,'data'),(1,'data')"
node.restart_clickhouse()
assert node.query(select_query) == "(0,'data'),(1,'data')"