mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #64778 from jkartseva/fix-plain-rewritable-cache
Fix crash in a local cache over `plain_rewritable` disk
This commit is contained in:
commit
12bf33c519
@ -39,6 +39,11 @@ ObjectStorageKey CachedObjectStorage::generateObjectKeyForPath(const std::string
|
||||
return object_storage->generateObjectKeyForPath(path);
|
||||
}
|
||||
|
||||
ObjectStorageKey CachedObjectStorage::generateObjectKeyPrefixForDirectoryPath(const std::string & path) const
|
||||
{
|
||||
return object_storage->generateObjectKeyPrefixForDirectoryPath(path);
|
||||
}
|
||||
|
||||
ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settings) const
|
||||
{
|
||||
ReadSettings modified_settings{read_settings};
|
||||
|
@ -100,6 +100,12 @@ public:
|
||||
|
||||
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override;
|
||||
|
||||
ObjectStorageKey generateObjectKeyPrefixForDirectoryPath(const std::string & path) const override;
|
||||
|
||||
void setKeysGenerator(ObjectStorageKeysGeneratorPtr gen) override { object_storage->setKeysGenerator(gen); }
|
||||
|
||||
bool isPlain() const override { return object_storage->isPlain(); }
|
||||
|
||||
bool isRemote() const override { return object_storage->isRemote(); }
|
||||
|
||||
void removeCacheIfExists(const std::string & path_key_for_cache) override;
|
||||
|
@ -8,6 +8,13 @@
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</disk_s3_plain_rewritable>
|
||||
<disk_cache_s3_plain_rewritable>
|
||||
<type>cache</type>
|
||||
<disk>disk_s3_plain_rewritable</disk>
|
||||
<path>/var/lib/clickhouse/disks/s3_plain_rewritable_cache/</path>
|
||||
<max_size>1000000000</max_size>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
</disk_cache_s3_plain_rewritable>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3_plain_rewritable>
|
||||
@ -17,6 +24,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</s3_plain_rewritable>
|
||||
<cache_s3_plain_rewritable>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>disk_cache_s3_plain_rewritable</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</cache_s3_plain_rewritable>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
||||
|
@ -8,11 +8,8 @@ from helpers.cluster import ClickHouseCluster
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
NUM_WORKERS = 5
|
||||
|
||||
MAX_ROWS = 1000
|
||||
|
||||
dirs_created = []
|
||||
|
||||
|
||||
def gen_insert_values(size):
|
||||
return ",".join(
|
||||
@ -46,8 +43,14 @@ def start_cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.order(0)
|
||||
def test_insert():
|
||||
@pytest.mark.parametrize(
|
||||
"storage_policy",
|
||||
[
|
||||
pytest.param("s3_plain_rewritable"),
|
||||
pytest.param("cache_s3_plain_rewritable"),
|
||||
],
|
||||
)
|
||||
def test(storage_policy):
|
||||
def create_insert(node, insert_values):
|
||||
node.query(
|
||||
"""
|
||||
@ -56,8 +59,10 @@ def test_insert():
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
SETTINGS storage_policy='s3_plain_rewritable'
|
||||
"""
|
||||
SETTINGS storage_policy='{}'
|
||||
""".format(
|
||||
storage_policy
|
||||
)
|
||||
)
|
||||
node.query("INSERT INTO test VALUES {}".format(insert_values))
|
||||
|
||||
@ -107,25 +112,6 @@ def test_insert():
|
||||
!= -1
|
||||
)
|
||||
|
||||
created = int(
|
||||
node.query(
|
||||
"SELECT value FROM system.events WHERE event = 'DiskPlainRewritableS3DirectoryCreated'"
|
||||
)
|
||||
)
|
||||
assert created > 0
|
||||
dirs_created.append(created)
|
||||
assert (
|
||||
int(
|
||||
node.query(
|
||||
"SELECT value FROM system.metrics WHERE metric = 'DiskPlainRewritableS3DirectoryMapSize'"
|
||||
)
|
||||
)
|
||||
== created
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.order(1)
|
||||
def test_restart():
|
||||
insert_values_arr = []
|
||||
for i in range(NUM_WORKERS):
|
||||
node = cluster.instances[f"node{i + 1}"]
|
||||
@ -138,6 +124,7 @@ def test_restart():
|
||||
|
||||
threads = []
|
||||
for i in range(NUM_WORKERS):
|
||||
node = cluster.instances[f"node{i + 1}"]
|
||||
t = threading.Thread(target=restart, args=(node,))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
@ -152,21 +139,10 @@ def test_restart():
|
||||
== insert_values_arr[i]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.order(2)
|
||||
def test_drop():
|
||||
for i in range(NUM_WORKERS):
|
||||
node = cluster.instances[f"node{i + 1}"]
|
||||
node.query("DROP TABLE IF EXISTS test SYNC")
|
||||
|
||||
removed = int(
|
||||
node.query(
|
||||
"SELECT value FROM system.events WHERE event = 'DiskPlainRewritableS3DirectoryRemoved'"
|
||||
)
|
||||
)
|
||||
|
||||
assert dirs_created[i] == removed
|
||||
|
||||
it = cluster.minio_client.list_objects(
|
||||
cluster.minio_bucket, "data/", recursive=True
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user