Merge pull request #69742 from ClickHouse/fix-s3-queue-ttl-sec

s3queue: fix tracked_files_ttl_sec
This commit is contained in:
Kseniia Sumarokova 2024-09-19 08:35:08 +00:00 committed by GitHub
commit f8fb4fb120
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 24 additions and 14 deletions

View File

@ -122,7 +122,7 @@ ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(
, local_file_statuses(std::make_shared<LocalFileStatuses>())
{
if (mode == ObjectStorageQueueMode::UNORDERED
&& (table_metadata.tracked_files_limit || table_metadata.tracked_file_ttl_sec))
&& (table_metadata.tracked_files_limit || table_metadata.tracked_files_ttl_sec))
{
task = Context::getGlobalContextInstance()->getSchedulePool().createTask(
"ObjectStorageQueueCleanupFunc",
@ -366,9 +366,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
return;
}
chassert(table_metadata.tracked_files_limit || table_metadata.tracked_file_ttl_sec);
chassert(table_metadata.tracked_files_limit || table_metadata.tracked_files_ttl_sec);
const bool check_nodes_limit = table_metadata.tracked_files_limit > 0;
const bool check_nodes_ttl = table_metadata.tracked_file_ttl_sec > 0;
const bool check_nodes_ttl = table_metadata.tracked_files_ttl_sec > 0;
const bool nodes_limit_exceeded = nodes_num > table_metadata.tracked_files_limit;
if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl)
@ -443,7 +443,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp);
return wb.str();
};
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", table_metadata.tracked_files_limit, table_metadata.tracked_file_ttl_sec, get_nodes_str());
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}",
table_metadata.tracked_files_limit, table_metadata.tracked_files_ttl_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - table_metadata.tracked_files_limit : 0;
for (const auto & node : sorted_nodes)
@ -464,7 +466,7 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
else if (check_nodes_ttl)
{
UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp;
if (node_age >= table_metadata.tracked_file_ttl_sec)
if (node_age >= table_metadata.tracked_files_ttl_sec)
{
LOG_TRACE(log, "Removing node at path {} ({}) because file ttl is reached",
node.metadata.file_path, node.zk_path);

View File

@ -45,7 +45,7 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
, after_processing(engine_settings.after_processing.toString())
, mode(engine_settings.mode.toString())
, tracked_files_limit(engine_settings.tracked_files_limit)
, tracked_file_ttl_sec(engine_settings.tracked_file_ttl_sec)
, tracked_files_ttl_sec(engine_settings.tracked_file_ttl_sec)
, buckets(engine_settings.buckets)
, processing_threads_num(engine_settings.processing_threads_num)
, last_processed_path(engine_settings.last_processed_path)
@ -59,7 +59,7 @@ String ObjectStorageQueueTableMetadata::toString() const
json.set("after_processing", after_processing);
json.set("mode", mode);
json.set("tracked_files_limit", tracked_files_limit);
json.set("tracked_file_ttl_sec", tracked_file_ttl_sec);
json.set("tracked_files_ttl_sec", tracked_files_ttl_sec);
json.set("processing_threads_num", processing_threads_num);
json.set("buckets", buckets);
json.set("format_name", format_name);
@ -100,7 +100,7 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSO
, after_processing(json->getValue<String>("after_processing"))
, mode(json->getValue<String>("mode"))
, tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
, tracked_file_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "s3queue_", 0))
, tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0)))
, buckets(getOrDefault(json, "buckets", "", 0))
, processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 1))
, last_processed_path(getOrDefault<String>(json, "last_processed_file", "s3queue_", ""))
@ -142,18 +142,18 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto
if (tracked_files_limit != from_zk.tracked_files_limit)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. "
"Existing table metadata in ZooKeeper differs in `tracked_files_limit`. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.tracked_files_limit,
tracked_files_limit);
if (tracked_file_ttl_sec != from_zk.tracked_file_ttl_sec)
if (tracked_files_ttl_sec != from_zk.tracked_files_ttl_sec)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. "
"Existing table metadata in ZooKeeper differs in `tracked_files_ttl_sec`. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.tracked_file_ttl_sec,
tracked_file_ttl_sec);
from_zk.tracked_files_ttl_sec,
tracked_files_ttl_sec);
if (format_name != from_zk.format_name)
throw Exception(

View File

@ -23,7 +23,7 @@ struct ObjectStorageQueueTableMetadata
const String after_processing;
const String mode;
const UInt64 tracked_files_limit;
const UInt64 tracked_file_ttl_sec;
const UInt64 tracked_files_ttl_sec;
const UInt64 buckets;
const UInt64 processing_threads_num;
const String last_processed_path;

View File

@ -976,6 +976,14 @@ def test_max_set_age(started_cluster):
)
)
node.restart_clickhouse()
expected_rows *= 2
wait_for_condition(lambda: get_count() == expected_rows)
assert files_to_generate == int(
node.query(f"SELECT uniq(_path) from {dst_table_name}")
)
def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"]