Validate settings with keeper, add a test

This commit is contained in:
kssenii 2024-01-29 12:36:00 +01:00
parent 0a21e6aa5d
commit ef252d8376
3 changed files with 114 additions and 4 deletions

View File

@ -16,8 +16,22 @@ namespace DB
namespace ErrorCodes
{
extern const int METADATA_MISMATCH;
extern const int BAD_ARGUMENTS;
}
namespace
{
S3QueueMode modeFromString(const std::string & mode)
{
if (mode == "ordered")
return S3QueueMode::ORDERED;
if (mode == "unordered")
return S3QueueMode::UNORDERED;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected S3Queue mode: {}", mode);
}
}
S3QueueTableMetadata::S3QueueTableMetadata(
const StorageS3::Configuration & configuration,
const S3QueueSettings & engine_settings,
@ -28,10 +42,11 @@ S3QueueTableMetadata::S3QueueTableMetadata(
mode = engine_settings.mode.toString();
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
s3queue_total_shards_num = engine_settings.s3queue_total_shards_num;
s3queue_processing_threads_num = engine_settings.s3queue_processing_threads_num;
columns = storage_metadata.getColumns().toString();
}
String S3QueueTableMetadata::toString() const
{
Poco::JSON::Object json;
@ -39,6 +54,8 @@ String S3QueueTableMetadata::toString() const
json.set("mode", mode);
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
json.set("s3queue_total_shards_num", s3queue_total_shards_num);
json.set("s3queue_processing_threads_num", s3queue_processing_threads_num);
json.set("format_name", format_name);
json.set("columns", columns);
@ -58,6 +75,10 @@ void S3QueueTableMetadata::read(const String & metadata_str)
s3queue_tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec");
format_name = json->getValue<String>("format_name");
columns = json->getValue<String>("columns");
if (json->has("s3queue_total_shards_num"))
s3queue_total_shards_num = json->getValue<UInt64>("s3queue_total_shards_num");
if (json->has("s3queue_processing_threads_num"))
s3queue_processing_threads_num = json->getValue<UInt64>("s3queue_processing_threads_num");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
@ -67,7 +88,6 @@ S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
return metadata;
}
void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
{
if (after_processing != from_zk.after_processing)
@ -83,8 +103,8 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in engine mode. "
"Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.mode),
DB::toString(mode));
from_zk.mode,
mode);
if (s3queue_tracked_files_limit != from_zk.s3queue_tracked_files_limit)
throw Exception(
@ -109,6 +129,28 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
"Stored in ZooKeeper: {}, local: {}",
from_zk.format_name,
format_name);
if (modeFromString(mode) == S3QueueMode::ORDERED)
{
if (s3queue_processing_threads_num != from_zk.s3queue_processing_threads_num)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in s3queue_processing_threads_num setting. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.s3queue_processing_threads_num,
s3queue_processing_threads_num);
}
if (s3queue_total_shards_num != from_zk.s3queue_total_shards_num)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.s3queue_total_shards_num,
s3queue_total_shards_num);
}
}
}
void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const

View File

@ -23,6 +23,8 @@ struct S3QueueTableMetadata
String mode;
UInt64 s3queue_tracked_files_limit;
UInt64 s3queue_tracked_file_ttl_sec;
UInt64 s3queue_total_shards_num;
UInt64 s3queue_processing_threads_num;
S3QueueTableMetadata() = default;
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings, const StorageInMemoryMetadata & storage_metadata);

View File

@ -165,6 +165,7 @@ def create_table(
file_format="CSV",
auth=DEFAULT_AUTH,
bucket=None,
expect_error=False,
):
auth_params = ",".join(auth)
bucket = started_cluster.minio_bucket if bucket is None else bucket
@ -184,6 +185,10 @@ def create_table(
ENGINE = S3Queue('{url}', {auth_params}, {file_format})
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
if expect_error:
return node.query_and_get_error(create_query)
node.query(create_query)
@ -1200,3 +1205,64 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == shards_num * processing_threads
def test_settings_check(started_cluster):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_settings_check"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
mode = "ordered"
i = 0
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
"s3queue_total_shards_num": 2,
"s3queue_current_shard_num": i,
},
)
assert (
"Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. Stored in ZooKeeper: 2, local: 3"
in create_table(
started_cluster,
node_2,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
"s3queue_total_shards_num": 3,
"s3queue_current_shard_num": i,
},
expect_error=True,
)
)
assert (
"Existing table metadata in ZooKeeper differs in s3queue_processing_threads_num setting. Stored in ZooKeeper: 5, local: 2"
in create_table(
started_cluster,
node_2,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 2,
"s3queue_total_shards_num": 2,
"s3queue_current_shard_num": i,
},
expect_error=True,
)
)