Fix tests

This commit is contained in:
avogar 2024-01-24 17:55:31 +00:00
parent 93fbe1d9c8
commit 11f1ea50d7
8 changed files with 79 additions and 39 deletions

View File

@ -226,10 +226,12 @@ try
continue;
}
SchemaReaderPtr schema_reader;
std::unique_ptr<PeekableReadBuffer> peekable_buf; /// Can be used in format detection. Should be destroyed after schema reader.
if (format_name)
{
SchemaReaderPtr schema_reader;
try
{
schema_reader = FormatFactory::instance().getSchemaReader(*format_name, *iterator_data.buf, context, format_settings);
@ -296,7 +298,6 @@ try
/// to high memory usage as it will save all the read data from the beginning of the file,
/// especially it will be noticeable for formats like Parquet/ORC/Arrow that do seeks to the
/// end of file.
std::unique_ptr<PeekableReadBuffer> peekable_buf;
bool support_buf_recreation = read_buffer_iterator.supportsLastReadBufferRecreation();
if (!support_buf_recreation)
{
@ -310,7 +311,7 @@ try
{
try
{
schema_reader = FormatFactory::instance().getSchemaReader(format_to_detect, support_buf_recreation ? *iterator_data.buf : *peekable_buf, context, format_settings);
SchemaReaderPtr schema_reader = FormatFactory::instance().getSchemaReader(format_to_detect, support_buf_recreation ? *iterator_data.buf : *peekable_buf, context, format_settings);
schema_reader->setMaxRowsAndBytesToRead(max_rows_to_read, max_bytes_to_read);
names_and_types = schema_reader->readSchema();
if (names_and_types.empty())
@ -355,7 +356,7 @@ try
{
try
{
schema_reader = FormatFactory::instance().getSchemaReader(
SchemaReaderPtr schema_reader = FormatFactory::instance().getSchemaReader(
formats_set_to_detect[i], support_buf_recreation ? *iterator_data.buf : *peekable_buf, context, format_settings);
schema_reader->setMaxRowsAndBytesToRead(max_rows_to_read, max_bytes_to_read);
auto tmp_names_and_types = schema_reader->readSchema();

View File

@ -1147,7 +1147,7 @@ void registerStorageHDFS(StorageFactory & factory)
}
if (format_name == "auto")
format_name = FormatFactory::instance().getFormatFromFileName(url);
format_name = FormatFactory::instance().tryGetFormatFromFileName(url).value_or("auto");
String compression_method;
if (engine_args.size() == 3)

View File

@ -144,7 +144,7 @@ StorageAzureBlob::Configuration StorageAzureBlob::getConfiguration(ASTs & engine
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path);
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.blob_path).value_or("auto");
return configuration;
}
@ -237,7 +237,7 @@ StorageAzureBlob::Configuration StorageAzureBlob::getConfiguration(ASTs & engine
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path);
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.blob_path).value_or("auto");
return configuration;
}
@ -1316,10 +1316,28 @@ namespace
Data next() override
{
/// For default mode check cached columns for currently read keys on first iteration.
if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
if (first)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns, format};
/// If format is unknown we iterate through all currently read keys on first iteration and
/// try to determine format by file name.
if (!format)
{
for (const auto & key : read_keys)
{
if (auto format_from_path = FormatFactory::instance().tryGetFormatFromFileName(key.relative_path))
{
format = format_from_path;
break;
}
}
}
/// For default mode check cached columns for currently read keys on first iteration.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns, format};
}
}
current_path_with_metadata = file_iterator->next();
@ -1345,15 +1363,33 @@ namespace
first = false;
/// AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT && read_keys.size() > prev_read_keys_size)
/// AzureBlobStorage file iterator could get new keys after new iteration.
if (read_keys.size() > prev_read_keys_size)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
/// If format is unknown we can try to determine it by new file names.
if (!format)
{
for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it)
{
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it).relative_path))
{
format = format_from_file_name;
break;
}
}
}
/// Check new files in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
if (columns_from_cache)
return {nullptr, columns_from_cache, format};
}
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
return {nullptr, columns_from_cache, format};
}
else if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
RelativePathsWithMetadata paths = {current_path_with_metadata};
if (auto columns_from_cache = tryGetColumnsFromCache(paths.begin(), paths.end()))
@ -1520,7 +1556,7 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
const std::optional<FormatSettings> & format_settings,
const DB::ContextPtr & ctx)
{
return getTableStructureAndFormatFromDataImpl(std::nullopt, object_storage, configuration, format_settings, ctx).first;
return getTableStructureAndFormatFromDataImpl(configuration.format, object_storage, configuration, format_settings, ctx).first;
}
SchemaCache & StorageAzureBlob::getSchemaCache(const ContextPtr & ctx)

View File

@ -1702,30 +1702,33 @@ namespace
return {nullptr, std::nullopt, format};
}
/// S3 file iterator could get new keys after new iteration, if format is unknown we can try to determine it by new file names.
if (!format && read_keys.size() > prev_read_keys_size)
/// S3 file iterator could get new keys after new iteration
if (read_keys.size() > prev_read_keys_size)
{
for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it)
/// If format is unknown we can try to determine it by new file names.
if (!format)
{
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it)->key))
for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it)
{
format = format_from_file_name;
break;
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it)->key))
{
format = format_from_file_name;
break;
}
}
}
}
/// S3 file iterator could get new keys after new iteration, check them in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT && read_keys.size() > prev_read_keys_size)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
/// Check new files in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
if (columns_from_cache)
return {nullptr, columns_from_cache, format};
}
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
return {nullptr, columns_from_cache, format};
}
prev_read_keys_size = read_keys.size();
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0)
continue;

View File

@ -58,7 +58,7 @@ void TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path);
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.blob_path).value_or("auto");
}
else
{

View File

@ -600,7 +600,7 @@ def test_schema_inference_with_globs(started_cluster):
)
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
"CANNOT_EXTRACT_TABLE_STRUCTURE" in result
)
@ -1044,7 +1044,7 @@ def test_union_schema_inference_mode(started_cluster):
error = node.query_and_get_error(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert "Cannot extract table structure" in error
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error
def test_format_detection(started_cluster):

View File

@ -1380,7 +1380,7 @@ def test_schema_inference_from_globs(started_cluster):
)
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
"CANNOT_EXTRACT_TABLE_STRUCTURE" in result
)
url_filename = "test{0,1,2,3}.jsoncompacteachrow"
@ -1390,7 +1390,7 @@ def test_schema_inference_from_globs(started_cluster):
)
assert (
"Cannot extract table structure from JSONCompactEachRow format file" in result
"CANNOT_EXTRACT_TABLE_STRUCTURE" in result
)
@ -2193,7 +2193,7 @@ def test_union_schema_inference_mode(started_cluster):
error = instance.query_and_get_error(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3,4}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert "Cannot extract table structure" in error
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error
def test_s3_format_detection(started_cluster):

View File

@ -58,7 +58,7 @@ SELECT * FROM \"abacaba/file.tsv\"
""" 2>&1 | tr '\n' ' ' | grep -oF "CANNOT_EXTRACT_TABLE_STRUCTURE"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM test_hdfs_4.\`http://localhost:11111/test/a.tsv\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "BAD_ARGUMENTS" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/file.myext\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "HDFS_ERROR" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/file.myext\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "CANNOT_EXTRACT_TABLE_STRUCTURE" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222/test_02725_3.tsv\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "CANNOT_EXTRACT_TABLE_STRUCTURE" > /dev/null && echo "OK" || echo 'FAIL' ||:
${CLICKHOUSE_CLIENT} --query "SELECT * FROM test_hdfs_4.\`hdfs://localhost:12222\`" 2>&1 | tr '\n' ' ' | grep -oF -e "UNKNOWN_TABLE" -e "BAD_ARGUMENTS" > /dev/null && echo "OK" || echo 'FAIL' ||: