Revert "Merge pull request #41249 from ClickHouse/revert-40968-s3-sharding-2"

This reverts commit ac60300997, reversing
changes made to a9c272283a.
This commit is contained in:
alesapin 2022-09-16 14:06:26 +02:00
parent 9597a47016
commit 456da30c9f
10 changed files with 84 additions and 51 deletions

View File

@ -31,6 +31,7 @@
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
namespace DB
{
@ -90,7 +91,19 @@ void logIfError(const Aws::Utils::Outcome<Result, Error> & response, std::functi
std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path */)
{
return getRandomASCIIString(32);
/// Path to store the new S3 object.
/// Total length is 32 a-z characters for enough randomness.
/// First 3 characters are used as a prefix for
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/
constexpr size_t key_name_total_size = 32;
constexpr size_t key_name_prefix_size = 3;
/// Path to store new S3 object.
return fmt::format("{}/{}",
getRandomASCIIString(key_name_prefix_size),
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
}
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const

View File

@ -3,6 +3,7 @@
#include <string>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
namespace DB
{

View File

@ -350,8 +350,7 @@ if __name__ == "__main__":
# randomizer, we should remove it after Sep 2022
try:
subprocess.check_call(
"docker volume rm $(docker volume ls -q | "
f"grep '{VOLUME_NAME}_.*_volume')",
f"docker volume ls -q | grep '{VOLUME_NAME}_.*_volume' | xargs --no-run-if-empty docker volume rm",
shell=True,
)
except Exception as ex:

View File

@ -27,7 +27,7 @@ def cluster():
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)

View File

@ -25,7 +25,7 @@ def cluster():
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)

View File

@ -121,11 +121,17 @@ def run_s3_mocks(cluster):
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == expected:
if (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
):
return
timeout -= 1
time.sleep(1)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == expected
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
)
@pytest.fixture(autouse=True)
@ -141,7 +147,9 @@ def drop_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, 0)
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
for obj in list(
minio.list_objects(cluster.minio_bucket, "data/", recursive=True)
):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -163,7 +171,7 @@ def test_simple_insert_select(
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + files_per_part
)
@ -174,7 +182,7 @@ def test_simple_insert_select(
== values1 + "," + values2
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + files_per_part * 2
)
@ -218,7 +226,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD_PER_PART_WIDE * 6 + FILES_OVERHEAD
)
@ -307,28 +315,28 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test DROP PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
@ -339,7 +347,8 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
@ -357,21 +366,21 @@ def test_move_partition_to_another_disk(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 's3'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -392,7 +401,7 @@ def test_table_manipulations(cluster, node_name):
node.query("RENAME TABLE s3_test TO s3_renamed")
assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("RENAME TABLE s3_renamed TO s3_test")
@ -403,14 +412,15 @@ def test_table_manipulations(cluster, node_name):
node.query("ATTACH TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("TRUNCATE TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
@ -435,7 +445,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -449,7 +459,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
# Number of objects in S3 should be unchanged.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -463,7 +473,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 6
)
@ -484,14 +494,14 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
# Data should remain in S3
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
node.query("ALTER TABLE s3_test FREEZE")
# Number S3 objects should be unchanged.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -500,7 +510,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4)
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -521,7 +531,7 @@ def test_freeze_unfreeze(cluster, node_name):
node.query("TRUNCATE TABLE s3_test")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -536,7 +546,8 @@ def test_freeze_unfreeze(cluster, node_name):
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
@ -559,7 +570,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
node.query("TRUNCATE TABLE s3_test")
node.query("DROP TABLE s3_test_removed NO DELAY")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -570,7 +581,8 @@ def test_freeze_system_unfreeze(cluster, node_name):
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
@ -697,7 +709,7 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)

View File

@ -85,20 +85,20 @@ def get_minio_stat(cluster):
)
).text.split("\n")
for line in stat:
x = re.search("s3_requests_total(\{.*\})?\s(\d+)(\s.*)?", line)
x = re.search(r"s3_requests_total(\{.*\})?\s(\d+)(\s.*)?", line)
if x != None:
y = re.search('.*api="(get|list|head|select).*', x.group(1))
if y != None:
result["get_requests"] += int(x.group(2))
else:
result["set_requests"] += int(x.group(2))
x = re.search("s3_errors_total(\{.*\})?\s(\d+)(\s.*)?", line)
x = re.search(r"s3_errors_total(\{.*\})?\s(\d+)(\s.*)?", line)
if x != None:
result["errors"] += int(x.group(2))
x = re.search("s3_rx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
x = re.search(r"s3_rx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
if x != None:
result["tx_bytes"] += float(x.group(2))
x = re.search("s3_tx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
x = re.search(r"s3_tx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
if x != None:
result["rx_bytes"] += float(x.group(2))
return result
@ -128,8 +128,10 @@ def get_query_stat(instance, hint):
def get_minio_size(cluster):
minio = cluster.minio_client
size = 0
for obj in minio.list_objects(cluster.minio_bucket, "data/"):
size += obj.size
for obj_level1 in minio.list_objects(
cluster.minio_bucket, prefix="data/", recursive=True
):
size += obj_level1.size
return size
@ -167,7 +169,7 @@ def test_profile_events(cluster):
metrics1["WriteBufferFromS3Bytes"] - metrics0["WriteBufferFromS3Bytes"] == size1
)
query2 = "INSERT INTO test_s3.test_s3 FORMAT Values"
query2 = "INSERT INTO test_s3.test_s3 VALUES"
instance.query(query2 + " (1,1)")
size2 = get_minio_size(cluster)
@ -182,9 +184,12 @@ def test_profile_events(cluster):
metrics2["S3WriteRequestsCount"] - metrics1["S3WriteRequestsCount"]
== minio2["set_requests"] - minio1["set_requests"]
)
stat2 = get_query_stat(instance, query2)
for metric in stat2:
assert stat2[metric] == metrics2[metric] - metrics1[metric]
assert (
metrics2["WriteBufferFromS3Bytes"] - metrics1["WriteBufferFromS3Bytes"]
== size2 - size1
@ -205,6 +210,7 @@ def test_profile_events(cluster):
== minio3["set_requests"] - minio2["set_requests"]
)
stat3 = get_query_stat(instance, query3)
# With async reads profile events are not updated fully because reads are done in a separate thread.
# for metric in stat3:
# print(metric)

View File

@ -113,7 +113,7 @@ def drop_table(cluster):
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -130,9 +130,9 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
insert(cluster, node_idxs=[1, 2, 3], verify=True)
minio = cluster.minio_client
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == 3 * (
FILES_OVERHEAD + files_per_part * 3
)
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == 3 * (FILES_OVERHEAD + files_per_part * 3)
def test_drop_cache_on_cluster(cluster):

View File

@ -87,7 +87,7 @@ def drop_table(cluster):
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -124,6 +124,6 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
)
minio = cluster.minio_client
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == (
3 * FILES_OVERHEAD
) + (files_per_part * 3)
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == (3 * FILES_OVERHEAD) + (files_per_part * 3)

View File

@ -39,7 +39,9 @@ def cluster():
def get_large_objects_count(cluster, size=100, folder="data"):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(cluster.minio_bucket, "{}/".format(folder)):
for obj in minio.list_objects(
cluster.minio_bucket, "{}/".format(folder), recursive=True
):
if obj.size is not None and obj.size >= size:
counter = counter + 1
return counter