Fix s3Cluster schema inference in parallel distributed insert select (#46381)

* Fix s3Cluster schema inference in parallel distributed insert select
* Try fix flaky test
* Try SYSTEM SYNC REPLICA to avoid test flakiness
This commit is contained in:
Kruglov Pavel 2023-02-15 15:30:43 +01:00 committed by GitHub
parent 88c32002e5
commit 4f380370a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 15 deletions

View File

@ -230,7 +230,7 @@ StorageDeltaLake::StorageDeltaLake(
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
new_configuration, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
@ -272,7 +272,7 @@ ColumnsDescription StorageDeltaLake::getTableStructureFromData(
{
StorageS3::updateS3Configuration(ctx, configuration);
auto new_configuration = getAdjustedS3Configuration(ctx, configuration, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
return StorageS3::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
}
void registerStorageDeltaLake(StorageFactory & factory)

View File

@ -163,7 +163,7 @@ StorageHudi::StorageHudi(
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr);
new_configuration, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
@ -203,7 +203,7 @@ ColumnsDescription StorageHudi::getTableStructureFromData(
{
StorageS3::updateS3Configuration(ctx, configuration);
auto new_configuration = getAdjustedS3Configuration(configuration, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr);
return StorageS3::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
}
void registerStorageHudi(StorageFactory & factory)

View File

@ -961,7 +961,6 @@ StorageS3::StorageS3(
format_name,
s3_configuration,
compression_method,
distributed_processing_,
is_key_with_globs,
format_settings,
context_,
@ -1369,14 +1368,13 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
ColumnsDescription StorageS3::getTableStructureFromData(
StorageS3::Configuration & configuration,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
ObjectInfos * object_infos)
{
updateS3Configuration(ctx, configuration);
return getTableStructureFromDataImpl(
configuration.format, configuration, configuration.compression_method, distributed_processing,
configuration.format, configuration, configuration.compression_method,
configuration.url.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos);
}
@ -1384,7 +1382,6 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
const String & format,
const Configuration & s3_configuration,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
@ -1396,7 +1393,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
s3_configuration,
{s3_configuration.url.key},
is_key_with_globs,
distributed_processing,
false,
ctx, nullptr,
{}, object_infos, &read_keys);

View File

@ -291,7 +291,6 @@ public:
static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
ObjectInfos * object_infos = nullptr);
@ -338,7 +337,6 @@ private:
const String & format,
const Configuration & s3_configuration,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,

View File

@ -66,8 +66,8 @@ StorageS3Cluster::StorageS3Cluster(
/// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set
/// for asking for the next tasks.
/// `format_settings` is set to std::nullopt, because StorageS3Cluster is used only as table function
auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method,
/*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_);
auto columns = StorageS3::getTableStructureFromDataImpl(
format_name, s3_configuration, compression_method, is_key_with_globs, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
}
else

View File

@ -139,7 +139,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return StorageS3::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);

View File

@ -82,7 +82,7 @@ ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr co
context->checkAccess(getSourceAccessType());
if (configuration.structure == "auto")
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return StorageS3::getTableStructureFromData(configuration, std::nullopt, context);
return parseColumnsListFromString(configuration.structure, context);
}

View File

@ -326,3 +326,39 @@ def test_distributed_insert_select_with_replicated(started_cluster):
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard' SYNC;"""
)
def test_parallel_distributed_insert_select_with_schema_inference(started_cluster):
node = started_cluster.instances["s0_0_0"]
node.query(
"""DROP TABLE IF EXISTS parallel_insert_select ON CLUSTER 'first_shard' SYNC;"""
)
node.query(
"""
CREATE TABLE parallel_insert_select ON CLUSTER 'first_shard' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
ORDER BY (a, b);
"""
)
node.query(
"""
INSERT INTO parallel_insert_select SELECT * FROM s3Cluster(
'first_shard',
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV'
) SETTINGS parallel_distributed_insert_select=1, use_structure_from_insertion_table_in_table_functions=0;
"""
)
node.query("SYSTEM SYNC REPLICA parallel_insert_select")
actual_count = int(
node.query(
"SELECT count() FROM s3('http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64')"
)
)
count = int(node.query("SELECT count() FROM parallel_insert_select"))
assert count == actual_count