Revert "Sharding s3 key names (2)"

This commit is contained in:
alesapin 2022-09-13 12:29:02 +02:00 committed by GitHub
parent 3dd28bb79a
commit 9af591a328
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 51 additions and 84 deletions

View File

@ -31,7 +31,6 @@
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
namespace DB
{
@ -91,19 +90,7 @@ void logIfError(const Aws::Utils::Outcome<Result, Error> & response, std::functi
std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path */)
{
/// Path to store the new S3 object.
/// Total length is 32 a-z characters for enough randomness.
/// First 3 characters are used as a prefix for
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/
constexpr size_t key_name_total_size = 32;
constexpr size_t key_name_prefix_size = 3;
/// Path to store new S3 object.
return fmt::format("{}/{}",
getRandomASCIIString(key_name_prefix_size),
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
return getRandomASCIIString(32);
}
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const

View File

@ -3,7 +3,6 @@
#include <string>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
namespace DB
{

View File

@ -350,7 +350,8 @@ if __name__ == "__main__":
# randomizer, we should remove it after Sep 2022
try:
subprocess.check_call(
f"docker volume ls -q | grep '{VOLUME_NAME}_.*_volume' | xargs --no-run-if-empty docker volume rm",
"docker volume rm $(docker volume ls -q | "
f"grep '{VOLUME_NAME}_.*_volume')",
shell=True,
)
except Exception as ex:

View File

@ -27,7 +27,7 @@ def cluster():
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)

View File

@ -25,7 +25,7 @@ def cluster():
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)

View File

@ -120,17 +120,11 @@ def run_s3_mocks(cluster):
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
):
if len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == expected:
return
timeout -= 1
time.sleep(1)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == expected
@pytest.fixture(autouse=True)
@ -146,9 +140,7 @@ def drop_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, 0)
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(
minio.list_objects(cluster.minio_bucket, "data/", recursive=True)
):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -170,7 +162,7 @@ def test_simple_insert_select(
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + files_per_part
)
@ -181,7 +173,7 @@ def test_simple_insert_select(
== values1 + "," + values2
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + files_per_part * 2
)
@ -225,7 +217,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD_PER_PART_WIDE * 6 + FILES_OVERHEAD
)
@ -314,28 +306,28 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test DROP PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
@ -346,8 +338,7 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -365,21 +356,21 @@ def test_move_partition_to_another_disk(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 's3'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -400,7 +391,7 @@ def test_table_manipulations(cluster, node_name):
node.query("RENAME TABLE s3_test TO s3_renamed")
assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("RENAME TABLE s3_renamed TO s3_test")
@ -411,15 +402,14 @@ def test_table_manipulations(cluster, node_name):
node.query("ATTACH TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("TRUNCATE TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -444,7 +434,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -458,7 +448,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
# Number of objects in S3 should be unchanged.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -472,7 +462,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 6
)
@ -493,14 +483,14 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
# Data should remain in S3
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
node.query("ALTER TABLE s3_test FREEZE")
# Number S3 objects should be unchanged.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -509,7 +499,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4)
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -530,7 +520,7 @@ def test_freeze_unfreeze(cluster, node_name):
node.query("TRUNCATE TABLE s3_test")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -543,8 +533,7 @@ def test_freeze_unfreeze(cluster, node_name):
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -567,7 +556,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
node.query("TRUNCATE TABLE s3_test")
node.query("DROP TABLE s3_test_removed NO DELAY")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -576,8 +565,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -704,7 +692,7 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)

View File

@ -62,7 +62,7 @@ init_list = {
def get_s3_events(instance):
result = init_list.copy()
events = instance.query(
"SELECT event, value FROM system.events WHERE event LIKE '%S3%'"
"SELECT event,value FROM system.events WHERE event LIKE '%S3%'"
).split("\n")
for event in events:
ev = event.split("\t")
@ -85,20 +85,20 @@ def get_minio_stat(cluster):
)
).text.split("\n")
for line in stat:
x = re.search(r"s3_requests_total(\{.*\})?\s(\d+)(\s.*)?", line)
x = re.search("s3_requests_total(\{.*\})?\s(\d+)(\s.*)?", line)
if x != None:
y = re.search('.*api="(get|list|head|select).*', x.group(1))
if y != None:
result["get_requests"] += int(x.group(2))
else:
result["set_requests"] += int(x.group(2))
x = re.search(r"s3_errors_total(\{.*\})?\s(\d+)(\s.*)?", line)
x = re.search("s3_errors_total(\{.*\})?\s(\d+)(\s.*)?", line)
if x != None:
result["errors"] += int(x.group(2))
x = re.search(r"s3_rx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
x = re.search("s3_rx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
if x != None:
result["tx_bytes"] += float(x.group(2))
x = re.search(r"s3_tx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
x = re.search("s3_tx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
if x != None:
result["rx_bytes"] += float(x.group(2))
return result
@ -128,10 +128,8 @@ def get_query_stat(instance, hint):
def get_minio_size(cluster):
minio = cluster.minio_client
size = 0
for obj_level1 in minio.list_objects(
cluster.minio_bucket, prefix="data/", recursive=True
):
size += obj_level1.size
for obj in minio.list_objects(cluster.minio_bucket, "data/"):
size += obj.size
return size
@ -147,7 +145,7 @@ def test_profile_events(cluster):
metrics0 = get_s3_events(instance)
minio0 = get_minio_stat(cluster)
query1 = "CREATE TABLE test_s3.test_s3 (key UInt32, value UInt32) ENGINE=MergeTree PRIMARY KEY key ORDER BY key SETTINGS storage_policy = 's3'"
query1 = "CREATE TABLE test_s3.test_s3 (key UInt32, value UInt32) ENGINE=MergeTree PRIMARY KEY key ORDER BY key SETTINGS storage_policy='s3'"
instance.query(query1)
size1 = get_minio_size(cluster)
@ -169,7 +167,7 @@ def test_profile_events(cluster):
metrics1["WriteBufferFromS3Bytes"] - metrics0["WriteBufferFromS3Bytes"] == size1
)
query2 = "INSERT INTO test_s3.test_s3 VALUES"
query2 = "INSERT INTO test_s3.test_s3 FORMAT Values"
instance.query(query2 + " (1,1)")
size2 = get_minio_size(cluster)
@ -184,12 +182,9 @@ def test_profile_events(cluster):
metrics2["S3WriteRequestsCount"] - metrics1["S3WriteRequestsCount"]
== minio2["set_requests"] - minio1["set_requests"]
)
stat2 = get_query_stat(instance, query2)
for metric in stat2:
assert stat2[metric] == metrics2[metric] - metrics1[metric]
assert (
metrics2["WriteBufferFromS3Bytes"] - metrics1["WriteBufferFromS3Bytes"]
== size2 - size1
@ -210,7 +205,6 @@ def test_profile_events(cluster):
== minio3["set_requests"] - minio2["set_requests"]
)
stat3 = get_query_stat(instance, query3)
# With async reads profile events are not updated fully because reads are done in a separate thread.
# for metric in stat3:
# print(metric)

View File

@ -113,7 +113,7 @@ def drop_table(cluster):
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -130,9 +130,9 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
insert(cluster, node_idxs=[1, 2, 3], verify=True)
minio = cluster.minio_client
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == 3 * (FILES_OVERHEAD + files_per_part * 3)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == 3 * (
FILES_OVERHEAD + files_per_part * 3
)
def test_drop_cache_on_cluster(cluster):

View File

@ -87,7 +87,7 @@ def drop_table(cluster):
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -124,6 +124,6 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
)
minio = cluster.minio_client
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == (3 * FILES_OVERHEAD) + (files_per_part * 3)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == (
3 * FILES_OVERHEAD
) + (files_per_part * 3)

View File

@ -39,9 +39,7 @@ def cluster():
def get_large_objects_count(cluster, size=100, folder="data"):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(
cluster.minio_bucket, "{}/".format(folder), recursive=True
):
for obj in minio.list_objects(cluster.minio_bucket, "{}/".format(folder)):
if obj.size is not None and obj.size >= size:
counter = counter + 1
return counter