This commit is contained in:
kssenii 2023-09-15 14:21:08 +02:00
parent 342755d35e
commit 6846fe3c58
2 changed files with 97 additions and 51 deletions

View File

@ -1,3 +1,4 @@
#include <set>
#include "Common/Exception.h"
#include "Common/ZooKeeper/Types.h"
#include "Interpreters/Context_fwd.h"
@ -253,7 +254,10 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path)
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
LOG_TEST(log, "Moved file `{}` to processed", path);
return;
}
/// TODO this could be because of the expired session.
if (responses[0]->error != Coordination::Error::ZOK)
@ -433,11 +437,16 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
};
auto node_cmp = [](const Node & a, const Node & b)
{
return a.metadata.last_processed_timestamp < b.metadata.last_processed_timestamp;
if (a.metadata.last_processed_timestamp == b.metadata.last_processed_timestamp)
return a.metadata.file_path < b.metadata.file_path;
else
return a.metadata.last_processed_timestamp < b.metadata.last_processed_timestamp;
};
/// Ordered in ascending order of timestamps.
std::set<Node, decltype(node_cmp)> sorted_nodes(node_cmp);
std::multiset<Node, decltype(node_cmp)> sorted_nodes(node_cmp);
LOG_TRACE(log, "Found {} nodes", nodes.size());
for (const auto & node : nodes)
{
@ -446,9 +455,11 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
std::string metadata_str;
if (zk_client->tryGet(zookeeper_processed_path / node, metadata_str))
{
bool inserted = sorted_nodes.emplace(node, NodeMetadata::fromString(metadata_str)).second;
chassert(inserted);
sorted_nodes.emplace(node, NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", node);
}
else
LOG_TEST(log, "Failed to fetch node metadata {}", node);
}
catch (...)
{
@ -458,7 +469,14 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
/// TODO add a zookeeper lock for cleanup
LOG_TRACE(log, "Checking node limits");
auto get_nodes_str = [&]()
{
WriteBufferFromOwnString wb;
for (const auto & [node, metadata] : sorted_nodes)
wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp);
return wb.str();
};
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", max_set_size, max_set_age_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0;
for (const auto & node : sorted_nodes)
@ -466,7 +484,8 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
if (nodes_to_remove)
{
auto path = zookeeper_processed_path / node.name;
LOG_TEST(log, "Removing node at path `{}` because max files limit is reached", path.string());
LOG_TEST(log, "Removing node at path {} ({}) because max files limit is reached",
node.metadata.file_path, path.string());
auto code = zk_client->tryRemove(path);
if (code == Coordination::Error::ZOK)
@ -480,7 +499,8 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
if (node_age >= max_set_age_sec)
{
auto path = zookeeper_processed_path / node.name;
LOG_TEST(log, "Removing node at path `{}` because file ttl is reached", path.string());
LOG_TEST(log, "Removing node at path {} ({}) because file is reached",
node.metadata.file_path, path.string());
auto code = zk_client->tryRemove(path);
if (code != Coordination::Error::ZOK)

View File

@ -159,6 +159,7 @@ def generate_random_files(
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
print(f"File {filename}, content: {total_values}")
put_s3_file_content(started_cluster, filename, values_csv)
return total_values
@ -755,9 +756,10 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_age"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 1
max_age = 10
files_to_generate = 10
create_table(
@ -768,68 +770,92 @@ def test_max_set_age(started_cluster):
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_files_limit": 10,
"s3queue_tracked_file_ttl_sec": max_age,
},
)
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
create_mv(node, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
res1 = [
list(map(int, l.split()))
for l in run_query(node, f"SELECT * FROM {table_name}").splitlines()
]
assert res1 == total_values
expected_rows = 10
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
time.sleep(max_age + 1)
res1 = [
list(map(int, l.split()))
for l in run_query(node, f"SELECT * FROM {table_name}").splitlines()
expected_rows = 20
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
paths_count = [
int(x)
for x in node.query(
f"SELECT count() from {dst_table_name} GROUP BY _path"
).splitlines()
]
assert res1 == total_values
assert 10 == len(paths_count)
for path_count in paths_count:
assert 2 == path_count
def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_size"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 10
files_to_generate = 10
files_path = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_size',
s3queue_tracked_files_limit = {files_to_generate - 1};
"""
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_files_limit": 9,
},
)
total_values = generate_random_files(
files_to_generate, files_path, started_cluster, bucket, start_ind=0, row_num=1
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
get_query = f"SELECT * FROM {table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == total_values
print(total_values)
time.sleep(10)
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == 9
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[0]]
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
time.sleep(10)
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[1]]