This commit is contained in:
kssenii 2023-08-01 12:19:15 +02:00
parent be458fd44b
commit 08f5ebf3e8

View File

@ -108,14 +108,15 @@ StorageS3Queue::StorageS3Queue(
if (!withGlobs())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue engine can read only from url with globs");
std::string setting_zookeeper_path = s3queue_settings->keeper_path;
std::string zk_path_prefix;
String setting_zookeeper_path = s3queue_settings->keeper_path;
if (setting_zookeeper_path.empty())
{
auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name);
bool is_in_replicated_database = database->getEngineName() == "Replicated";
auto default_path = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
String zk_path_prefix;
if (!default_path.empty())
{
zk_path_prefix = default_path;
@ -133,15 +134,16 @@ StorageS3Queue::StorageS3Queue(
"S3Queue keeper_path engine setting not specified, "
"s3queue_default_zookeeper_path_prefix not specified");
}
zookeeper_path = zkutil::extractZooKeeperPath(
fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
}
else
{
zk_path_prefix = s3queue_settings->keeper_path.value;
/// We do not add table uuid here on purpose.
zookeeper_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log);
}
zookeeper_path = zkutil::extractZooKeeperPath(
fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
LOG_INFO(log, "Using zookeeper path: {}", zookeeper_path);
FormatFactory::instance().checkFormatName(format_name);