Fix tracked_files_ttl_sec

This commit is contained in:
kssenii 2024-09-18 19:25:18 +02:00
parent a997cfad2b
commit 373927d6a5
4 changed files with 24 additions and 14 deletions

View File

@ -122,7 +122,7 @@ ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(
, local_file_statuses(std::make_shared<LocalFileStatuses>()) , local_file_statuses(std::make_shared<LocalFileStatuses>())
{ {
if (mode == ObjectStorageQueueMode::UNORDERED if (mode == ObjectStorageQueueMode::UNORDERED
&& (table_metadata.tracked_files_limit || table_metadata.tracked_file_ttl_sec)) && (table_metadata.tracked_files_limit || table_metadata.tracked_files_ttl_sec))
{ {
task = Context::getGlobalContextInstance()->getSchedulePool().createTask( task = Context::getGlobalContextInstance()->getSchedulePool().createTask(
"ObjectStorageQueueCleanupFunc", "ObjectStorageQueueCleanupFunc",
@ -366,9 +366,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
return; return;
} }
chassert(table_metadata.tracked_files_limit || table_metadata.tracked_file_ttl_sec); chassert(table_metadata.tracked_files_limit || table_metadata.tracked_files_ttl_sec);
const bool check_nodes_limit = table_metadata.tracked_files_limit > 0; const bool check_nodes_limit = table_metadata.tracked_files_limit > 0;
const bool check_nodes_ttl = table_metadata.tracked_file_ttl_sec > 0; const bool check_nodes_ttl = table_metadata.tracked_files_ttl_sec > 0;
const bool nodes_limit_exceeded = nodes_num > table_metadata.tracked_files_limit; const bool nodes_limit_exceeded = nodes_num > table_metadata.tracked_files_limit;
if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl) if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl)
@ -443,7 +443,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp); wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp);
return wb.str(); return wb.str();
}; };
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", table_metadata.tracked_files_limit, table_metadata.tracked_file_ttl_sec, get_nodes_str());
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}",
table_metadata.tracked_files_limit, table_metadata.tracked_files_ttl_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - table_metadata.tracked_files_limit : 0; size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - table_metadata.tracked_files_limit : 0;
for (const auto & node : sorted_nodes) for (const auto & node : sorted_nodes)
@ -464,7 +466,7 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
else if (check_nodes_ttl) else if (check_nodes_ttl)
{ {
UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp; UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp;
if (node_age >= table_metadata.tracked_file_ttl_sec) if (node_age >= table_metadata.tracked_files_ttl_sec)
{ {
LOG_TRACE(log, "Removing node at path {} ({}) because file ttl is reached", LOG_TRACE(log, "Removing node at path {} ({}) because file ttl is reached",
node.metadata.file_path, node.zk_path); node.metadata.file_path, node.zk_path);

View File

@ -45,7 +45,7 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
, after_processing(engine_settings.after_processing.toString()) , after_processing(engine_settings.after_processing.toString())
, mode(engine_settings.mode.toString()) , mode(engine_settings.mode.toString())
, tracked_files_limit(engine_settings.tracked_files_limit) , tracked_files_limit(engine_settings.tracked_files_limit)
, tracked_file_ttl_sec(engine_settings.tracked_file_ttl_sec) , tracked_files_ttl_sec(engine_settings.tracked_file_ttl_sec)
, buckets(engine_settings.buckets) , buckets(engine_settings.buckets)
, processing_threads_num(engine_settings.processing_threads_num) , processing_threads_num(engine_settings.processing_threads_num)
, last_processed_path(engine_settings.last_processed_path) , last_processed_path(engine_settings.last_processed_path)
@ -59,7 +59,7 @@ String ObjectStorageQueueTableMetadata::toString() const
json.set("after_processing", after_processing); json.set("after_processing", after_processing);
json.set("mode", mode); json.set("mode", mode);
json.set("tracked_files_limit", tracked_files_limit); json.set("tracked_files_limit", tracked_files_limit);
json.set("tracked_file_ttl_sec", tracked_file_ttl_sec); json.set("tracked_files_ttl_sec", tracked_files_ttl_sec);
json.set("processing_threads_num", processing_threads_num); json.set("processing_threads_num", processing_threads_num);
json.set("buckets", buckets); json.set("buckets", buckets);
json.set("format_name", format_name); json.set("format_name", format_name);
@ -100,7 +100,7 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSO
, after_processing(json->getValue<String>("after_processing")) , after_processing(json->getValue<String>("after_processing"))
, mode(json->getValue<String>("mode")) , mode(json->getValue<String>("mode"))
, tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0)) , tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
, tracked_file_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "s3queue_", 0)) , tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0)))
, buckets(getOrDefault(json, "buckets", "", 0)) , buckets(getOrDefault(json, "buckets", "", 0))
, processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 1)) , processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 1))
, last_processed_path(getOrDefault<String>(json, "last_processed_file", "s3queue_", "")) , last_processed_path(getOrDefault<String>(json, "last_processed_file", "s3queue_", ""))
@ -142,18 +142,18 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto
if (tracked_files_limit != from_zk.tracked_files_limit) if (tracked_files_limit != from_zk.tracked_files_limit)
throw Exception( throw Exception(
ErrorCodes::METADATA_MISMATCH, ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. " "Existing table metadata in ZooKeeper differs in `tracked_files_limit`. "
"Stored in ZooKeeper: {}, local: {}", "Stored in ZooKeeper: {}, local: {}",
from_zk.tracked_files_limit, from_zk.tracked_files_limit,
tracked_files_limit); tracked_files_limit);
if (tracked_file_ttl_sec != from_zk.tracked_file_ttl_sec) if (tracked_files_ttl_sec != from_zk.tracked_files_ttl_sec)
throw Exception( throw Exception(
ErrorCodes::METADATA_MISMATCH, ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. " "Existing table metadata in ZooKeeper differs in `tracked_files_ttl_sec`. "
"Stored in ZooKeeper: {}, local: {}", "Stored in ZooKeeper: {}, local: {}",
from_zk.tracked_file_ttl_sec, from_zk.tracked_files_ttl_sec,
tracked_file_ttl_sec); tracked_files_ttl_sec);
if (format_name != from_zk.format_name) if (format_name != from_zk.format_name)
throw Exception( throw Exception(

View File

@ -23,7 +23,7 @@ struct ObjectStorageQueueTableMetadata
const String after_processing; const String after_processing;
const String mode; const String mode;
const UInt64 tracked_files_limit; const UInt64 tracked_files_limit;
const UInt64 tracked_file_ttl_sec; const UInt64 tracked_files_ttl_sec;
const UInt64 buckets; const UInt64 buckets;
const UInt64 processing_threads_num; const UInt64 processing_threads_num;
const String last_processed_path; const String last_processed_path;

View File

@ -976,6 +976,14 @@ def test_max_set_age(started_cluster):
) )
) )
node.restart_clickhouse()
expected_rows *= 2
wait_for_condition(lambda: get_count() == expected_rows)
assert files_to_generate == int(
node.query(f"SELECT uniq(_path) from {dst_table_name}")
)
def test_max_set_size(started_cluster): def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"] node = started_cluster.instances["instance"]