Merge pull request #64845 from ClickHouse/follow-up-s3-queue

Follow-up to #64349
This commit is contained in:
Alexey Milovidov 2024-06-06 02:56:58 +02:00 committed by GitHub
commit 9ab06931d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 17 additions and 20 deletions

View File

@ -82,7 +82,6 @@ private:
const fs::path zookeeper_path;
const size_t buckets_num;
bool initialized = false;
LoggerPtr log;
std::atomic_bool shutdown_called = false;

View File

@ -13,7 +13,7 @@ class ASTStorage;
#define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
M(S3QueueMode, \
mode, \
S3QueueMode::UNORDERED, \
S3QueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \

View File

@ -71,8 +71,14 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings)
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach)
{
if (!is_attach && !s3queue_settings.mode.changed)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `mode` (Unordered/Ordered) is not specified, but is required.");
}
/// In case !is_attach, we leave Ordered mode as default for compatibility.
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
@ -125,15 +131,7 @@ StorageS3Queue::StorageS3Queue(
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
}
if (mode == LoadingStrictnessLevel::CREATE
&& !context_->getSettingsRef().s3queue_allow_experimental_sharded_mode
&& s3queue_settings->mode == S3QueueMode::ORDERED
&& (s3queue_settings->s3queue_buckets > 1 || s3queue_settings->s3queue_processing_threads_num > 1))
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue sharded mode is not allowed. To enable use `s3queue_allow_experimental_sharded_mode`");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef());
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);

View File

@ -195,10 +195,10 @@ def test_create_table():
f"DeltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '{password}')",
"DNS_ERROR",
),
f"S3Queue('http://minio1:9001/root/data/', 'CSV')",
f"S3Queue('http://minio1:9001/root/data/', 'CSV', 'gzip')",
f"S3Queue('http://minio1:9001/root/data/', 'minio', '{password}', 'CSV')",
f"S3Queue('http://minio1:9001/root/data/', 'minio', '{password}', 'CSV', 'gzip')",
f"S3Queue('http://minio1:9001/root/data/', 'CSV') settings mode = 'ordered'",
f"S3Queue('http://minio1:9001/root/data/', 'CSV', 'gzip') settings mode = 'ordered'",
f"S3Queue('http://minio1:9001/root/data/', 'minio', '{password}', 'CSV') settings mode = 'ordered'",
f"S3Queue('http://minio1:9001/root/data/', 'minio', '{password}', 'CSV', 'gzip') settings mode = 'ordered'",
]
def make_test_case(i):
@ -258,10 +258,10 @@ def test_create_table():
"CREATE TABLE table14 (x int) ENGINE = S3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV', 'gzip')",
"CREATE TABLE table15 (`x` int) ENGINE = S3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
"CREATE TABLE table16 (`x` int) ENGINE = DeltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
"CREATE TABLE table17 (x int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'CSV')",
"CREATE TABLE table18 (x int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'CSV', 'gzip')",
"CREATE TABLE table19 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV')",
"CREATE TABLE table20 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV', 'gzip')",
"CREATE TABLE table17 (x int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'CSV') settings mode = 'ordered'",
"CREATE TABLE table18 (x int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'CSV', 'gzip') settings mode = 'ordered'",
"CREATE TABLE table19 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV') settings mode = 'ordered'",
"CREATE TABLE table20 (`x` int) ENGINE = S3Queue('http://minio1:9001/root/data/', 'minio', '[HIDDEN]', 'CSV', 'gzip') settings mode = 'ordered'",
],
must_not_contain=[password],
)