Better paths for s3 queue metadata

This commit is contained in:
kssenii 2023-08-31 18:10:38 +02:00
parent e67c002cb0
commit da185fc8b1
2 changed files with 12 additions and 31 deletions

View File

@ -105,7 +105,7 @@ class IColumn;
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \

View File

@ -106,42 +106,23 @@ StorageS3Queue::StorageS3Queue(
if (!withGlobs())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
String setting_zk_path = s3queue_settings->keeper_path;
if (setting_zk_path.empty())
std::string zk_path_prefix = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings->keeper_path.changed)
{
auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name);
bool is_in_replicated_database = database->getEngineName() == "Replicated";
auto default_path = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
String zk_path_prefix;
if (!default_path.empty())
{
zk_path_prefix = default_path;
}
else if (is_in_replicated_database)
{
LOG_INFO(log, "S3Queue engine zookeeper path is not specified. "
"Using replicated database zookeeper path");
zk_path_prefix = fs::path(assert_cast<const DatabaseReplicated *>(database.get())->getZooKeeperPath()) / "s3queue";
}
else
{
throw Exception(ErrorCodes::NO_ZOOKEEPER,
"S3Queue keeper_path engine setting not specified, "
"s3queue_default_zookeeper_path_prefix not specified");
}
zk_path = zkutil::extractZooKeeperPath(
fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings->keeper_path.value;
}
else
{
/// We do not add table uuid here on purpose.
zk_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log);
auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id_.database_name)->getUUID();
result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id_.uuid);
}
zk_path = zkutil::extractZooKeeperPath(result_zk_path, true/* check_starts_with_slash */, log);
LOG_INFO(log, "Using zookeeper path: {}", zk_path);
FormatFactory::instance().checkFormatName(configuration.format);