Add union mode for schema inference to infer union schema of files with different schemas

This commit is contained in:
avogar 2023-10-20 20:46:41 +00:00
parent 7cedfeff12
commit 6934e27e8b
41 changed files with 1029 additions and 593 deletions

View File

@ -893,11 +893,11 @@ class IColumn;
M(Bool, input_format_parquet_preserve_order, false, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_parquet_filter_push_down, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, true, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_orc_use_fast_decoder, true, "Use a faster ORC decoder implementation.", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
M(Bool, input_format_parquet_allow_missing_columns, true, "Allow missing columns while reading Parquet input formats", 0) \
M(UInt64, input_format_parquet_local_file_min_bytes_for_seek, 8192, "Min bytes required for local read (file) to do seek, instead of read with ignore in Parquet input format", 0) \
M(Bool, input_format_arrow_allow_missing_columns, false, "Allow missing columns while reading Arrow input formats", 0) \
M(Bool, input_format_arrow_allow_missing_columns, true, "Allow missing columns while reading Arrow input formats", 0) \
M(Char, input_format_hive_text_fields_delimiter, '\x01', "Delimiter between fields in Hive Text File", 0) \
M(Char, input_format_hive_text_collection_items_delimiter, '\x02', "Delimiter between collection(array or map) items in Hive Text File", 0) \
M(Char, input_format_hive_text_map_keys_delimiter, '\x03', "Delimiter between a pair of map key/values in Hive Text File", 0) \
@ -925,6 +925,7 @@ class IColumn;
M(Bool, input_format_arrow_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Arrow", 0) \
M(String, column_names_for_schema_inference, "", "The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'", 0) \
M(String, schema_inference_hints, "", "The list of column names and types to use in schema inference for formats without column names. The format: 'column_name1 column_type1, column_name2 column_type2, ...'", 0) \
M(SchemaInferenceMode, schema_inference_mode, "default", "Mode of schema inference. 'default' - assume that all files have the same schema and schema can be inferred from any file, 'union' - files can have different schemas and the resulting schema should be the a union of schemas of all files", 0) \
M(Bool, schema_inference_make_columns_nullable, true, "If set to true, all inferred types will be Nullable in schema inference for formats without information about nullability.", 0) \
M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \
M(Bool, input_format_json_try_infer_numbers_from_strings, false, "Try to infer numbers from string fields while schema inference", 0) \

View File

@ -80,6 +80,9 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.10", {{"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"},
{"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"},
{"input_format_arrow_allow_missing_columns", false, true, "Allow missing columns in Arrow files by default"}}},
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"},
{"input_format_json_try_infer_named_tuples_from_objects", false, true, "Try to infer named Tuples from JSON objects by default"},
{"input_format_json_read_numbers_as_strings", false, true, "Allow to read numbers as strings in JSON formats by default"},

View File

@ -190,4 +190,8 @@ IMPLEMENT_SETTING_ENUM(ExternalCommandStderrReaction, ErrorCodes::BAD_ARGUMENTS,
{"log_last", ExternalCommandStderrReaction::LOG_LAST},
{"throw", ExternalCommandStderrReaction::THROW}})
IMPLEMENT_SETTING_ENUM(SchemaInferenceMode, ErrorCodes::BAD_ARGUMENTS,
{{"default", SchemaInferenceMode::DEFAULT},
{"union", SchemaInferenceMode::UNION}})
}

View File

@ -242,4 +242,12 @@ DECLARE_SETTING_ENUM(S3QueueAction)
DECLARE_SETTING_ENUM(ExternalCommandStderrReaction)
enum class SchemaInferenceMode
{
DEFAULT,
UNION,
};
DECLARE_SETTING_ENUM(SchemaInferenceMode)
}

View File

@ -1,12 +1,9 @@
#include <DataTypes/DataTypeMap.h>
#include <Formats/ReadSchemaUtils.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Storages/IStorage.h>
#include <Common/assert_cast.h>
#include <IO/WithFileName.h>
#include <IO/WithFileSize.h>
#include <IO/EmptyReadBuffer.h>
namespace DB
{
@ -55,6 +52,10 @@ ColumnsDescription readSchemaFromFormat(
try
{
NamesAndTypesList names_and_types;
SchemaInferenceMode mode = context->getSettingsRef().schema_inference_mode;
if (mode == SchemaInferenceMode::UNION && !FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context, format_settings))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "UNION schema inference mode is not supported for format {}, because it doesn't support reading subset of columns", format_name);
if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name))
{
auto external_schema_reader = FormatFactory::instance().getExternalSchemaReader(format_name, context, format_settings);
@ -71,6 +72,11 @@ try
}
else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
{
if (mode == SchemaInferenceMode::UNION)
retry = false;
std::vector<std::pair<NamesAndTypesList, String>> schemas_for_union_mode;
std::optional<ColumnsDescription> cached_columns;
std::string exception_messages;
SchemaReaderPtr schema_reader;
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference
@ -84,7 +90,15 @@ try
try
{
read_buffer_iterator.setPreviousReadBuffer(std::move(buf));
buf = read_buffer_iterator.next();
std::tie(buf, cached_columns) = read_buffer_iterator.next();
if (cached_columns)
{
if (mode == SchemaInferenceMode::DEFAULT)
return *cached_columns;
schemas_for_union_mode.emplace_back(cached_columns->getAll(), read_buffer_iterator.getLastFileName());
continue;
}
if (!buf)
break;
@ -136,12 +150,19 @@ try
auto num_rows = schema_reader->readNumberOrRows();
if (num_rows)
read_buffer_iterator.setNumRowsToLastFile(*num_rows);
break;
/// In default mode, we finish when schema is inferred successfully from any file.
if (mode == SchemaInferenceMode::DEFAULT)
break;
if (!names_and_types.empty())
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFileName());
}
catch (...)
{
auto exception_message = getCurrentExceptionMessage(false);
if (schema_reader)
if (schema_reader && mode == SchemaInferenceMode::DEFAULT)
{
size_t rows_read = schema_reader->getNumRowsRead();
assert(rows_read <= max_rows_to_read);
@ -190,8 +211,58 @@ try
}
}
if (auto cached_columns = read_buffer_iterator.getCachedColumns())
return *cached_columns;
/// If we got all schemas from cache, schema_reader can be uninitialized.
/// But we still need some stateless methods of ISchemaReader,
/// let's initialize it with empty buffer.
EmptyReadBuffer empty;
if (!schema_reader)
schema_reader = FormatFactory::instance().getSchemaReader(format_name, empty, context, format_settings);
if (mode == SchemaInferenceMode::UNION)
{
Names names_order; /// Try to save original columns order;
std::unordered_map<String, DataTypePtr> names_to_types;
for (const auto & [schema, file_name] : schemas_for_union_mode)
{
for (const auto & [name, type] : schema)
{
auto it = names_to_types.find(name);
if (it == names_to_types.end())
{
names_order.push_back(name);
names_to_types[name] = type;
}
else
{
/// We already have column with such name.
/// Check if types are the same.
if (!type->equals(*it->second))
{
/// If types are not the same, try to transform them according
/// to the format to find common type.
auto new_type_copy = type;
schema_reader->transformTypesFromDifferentFilesIfNeeded(it->second, new_type_copy);
/// If types are not the same after transform, we cannot do anything, throw an exception.
if (!it->second->equals(*new_type_copy))
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Automatically inferred type {} for column '{}'{} differs from type inferred from previous files: {}",
type->getName(),
name,
file_name.empty() ? "" : " in file " + file_name,
it->second->getName());
}
}
}
}
names_and_types.clear();
for (const auto & name : names_order)
names_and_types.emplace_back(name, names_to_types[name]);
}
if (names_and_types.empty())
throw Exception(
@ -206,7 +277,7 @@ try
/// It will allow to execute simple data loading with query
/// "INSERT INTO table SELECT * FROM ..."
const auto & insertion_table = context->getInsertionTable();
if (!schema_reader->hasStrictOrderOfColumns() && !insertion_table.empty())
if (schema_reader && !schema_reader->hasStrictOrderOfColumns() && !insertion_table.empty())
{
auto storage = DatabaseCatalog::instance().getTable(insertion_table, context);
auto metadata = storage->getInMemoryMetadataPtr();
@ -226,13 +297,15 @@ try
names_and_types.erase(
std::remove_if(names_and_types.begin(), names_and_types.end(), [](const NameAndTypePair & pair) { return pair.name.empty(); }),
names_and_types.end());
return ColumnsDescription(names_and_types);
auto columns = ColumnsDescription(names_and_types);
if (mode == SchemaInferenceMode::DEFAULT)
read_buffer_iterator.setResultingSchema(columns);
return columns;
}
catch (Exception & e)
{
if (!buf)
throw;
auto file_name = getFileNameFromReadBuffer(*buf);
auto file_name = read_buffer_iterator.getLastFileName();
if (!file_name.empty())
e.addMessage(fmt::format("(in file/uri {})", file_name));
throw;
@ -256,9 +329,9 @@ SchemaCache::Key getKeyForSchemaCache(
return getKeysForSchemaCache({source}, format, format_settings, context).front();
}
static SchemaCache::Key makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info)
static SchemaCache::Key makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info, const String & schema_inference_mode)
{
return SchemaCache::Key{source, format, additional_format_info};
return SchemaCache::Key{source, format, additional_format_info, schema_inference_mode};
}
SchemaCache::Keys getKeysForSchemaCache(
@ -270,13 +343,14 @@ SchemaCache::Keys getKeysForSchemaCache(
/// For example, for Protobuf format additional information is the path to the schema
/// and message name.
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings);
String schema_inference_mode(magic_enum::enum_name(context->getSettingsRef().schema_inference_mode.value));
SchemaCache::Keys cache_keys;
cache_keys.reserve(sources.size());
std::transform(
sources.begin(),
sources.end(),
std::back_inserter(cache_keys),
[&](const auto & source) { return makeSchemaCacheKey(source, format, additional_format_info); });
[&](const auto & source) { return makeSchemaCacheKey(source, format, additional_format_info, schema_inference_mode); });
return cache_keys;
}

View File

@ -13,11 +13,23 @@ struct IReadBufferIterator
virtual void setPreviousReadBuffer(std::unique_ptr<ReadBuffer> /* buffer */) {}
virtual std::unique_ptr<ReadBuffer> next() = 0;
virtual std::optional<ColumnsDescription> getCachedColumns() { return std::nullopt; }
/// Return read buffer of the next file or cached schema.
/// In DEFAULT schema inference mode cached schema can be from any file.
/// In UNION mode cached schema can be only from current file.
/// When there is no files to process, return pair (nullptr, nullopt)
virtual std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() = 0;
virtual void setNumRowsToLastFile(size_t /*num_rows*/) {}
/// Set schema inferred from last file. Used for UNION mode to cache schema
/// per file.
virtual void setSchemaToLastFile(const ColumnsDescription & /*columns*/) {}
/// Set resulting inferred schema. Used for DEFAULT mode to cache schema
/// for all files.
virtual void setResultingSchema(const ColumnsDescription & /*columns*/) {}
/// Get last processed file name for better exception messages.
virtual String getLastFileName() const { return ""; }
};
struct SingleReadBufferIterator : public IReadBufferIterator
@ -27,12 +39,12 @@ public:
{
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
if (done)
return nullptr;
return {nullptr, {}};
done = true;
return std::move(buf);
return {std::move(buf), {}};
}
private:

View File

@ -547,6 +547,54 @@ namespace
}
}
void mergeNamedTuples(DataTypes & data_types, TypeIndexesSet & type_indexes, const FormatSettings & settings, JSONInferenceInfo * json_info)
{
if (!type_indexes.contains(TypeIndex::Tuple))
return;
/// Collect all names and their types from all named tuples.
std::unordered_map<String, DataTypes> names_to_types;
/// Try to save original order of element names.
Names element_names;
for (auto & type : data_types)
{
const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type.get());
if (tuple_type && tuple_type->haveExplicitNames())
{
const auto & elements = tuple_type->getElements();
const auto & names = tuple_type->getElementNames();
for (size_t i = 0; i != elements.size(); ++i)
{
if (!names_to_types.contains(names[i]))
element_names.push_back(names[i]);
names_to_types[names[i]].push_back(elements[i]);
}
}
}
/// Try to find common type for each tuple element with the same name.
DataTypes element_types;
element_types.reserve(names_to_types.size());
for (const auto & name : element_names)
{
auto types = names_to_types[name];
transformInferredTypesIfNeededImpl<true>(types, settings, json_info);
/// If some element have different types in different tuples, we can't do anything
if (!checkIfTypesAreEqual(types))
return;
element_types.push_back(types.front());
}
DataTypePtr result_tuple = std::make_shared<DataTypeTuple>(element_types, element_names);
for (auto & type : data_types)
{
const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type.get());
if (tuple_type && tuple_type->haveExplicitNames())
type = result_tuple;
}
}
template <bool is_json>
void transformInferredTypesIfNeededImpl(DataTypes & types, const FormatSettings & settings, JSONInferenceInfo * json_info)
{
@ -604,6 +652,9 @@ namespace
if (settings.json.read_objects_as_strings)
transformMapsAndStringsToStrings(data_types, type_indexes);
if (json_info && json_info->allow_merging_named_tuples)
mergeNamedTuples(data_types, type_indexes, settings, json_info);
};
transformTypesRecursively(types, transform_simple_types, transform_complex_types);
@ -1180,6 +1231,13 @@ void transformInferredJSONTypesIfNeeded(
second = std::move(types[1]);
}
void transformInferredJSONTypesFromDifferentFilesIfNeeded(DataTypePtr & first, DataTypePtr & second, const FormatSettings & settings)
{
JSONInferenceInfo json_info;
json_info.allow_merging_named_tuples = true;
transformInferredJSONTypesIfNeeded(first, second, settings, &json_info);
}
void transformFinalInferredJSONTypeIfNeededImpl(DataTypePtr & data_type, const FormatSettings & settings, JSONInferenceInfo * json_info, bool remain_nothing_types = false)
{
if (!data_type)

View File

@ -14,6 +14,11 @@ struct JSONInferenceInfo
std::unordered_set<const IDataType *> numbers_parsed_from_json_strings;
/// Indicates if currently we are inferring type for Map/Object key.
bool is_object_key = false;
/// When we transform types for the same column from different files
/// we cannot use DataTypeJSONPaths for inferring named tuples from JSON objects,
/// because DataTypeJSONPaths was already finalized to named tuple. IN this case
/// we can only merge named tuples from different files together.
bool allow_merging_named_tuples = false;
};
/// Try to determine datatype of the value in buffer/string. If the type cannot be inferred, return nullptr.
@ -64,9 +69,7 @@ void transformInferredTypesIfNeeded(DataTypePtr & first, DataTypePtr & second, c
/// from strings in json_info while inference and use it here, so we will know that Array(Int64) contains
/// integer inferred from a string.
/// Example 2:
/// When we have maps with different value types, we convert all types to JSON object type.
/// For example, if we have Map(String, UInt64) (like `{"a" : 123}`) and Map(String, String) (like `{"b" : 'abc'}`)
/// we will convert both types to Object('JSON').
/// We merge DataTypeJSONPaths types to a single DataTypeJSONPaths type with union of all JSON paths.
void transformInferredJSONTypesIfNeeded(DataTypePtr & first, DataTypePtr & second, const FormatSettings & settings, JSONInferenceInfo * json_info);
/// Make final transform for types inferred in JSON format. It does 3 types of transformation:
@ -78,6 +81,11 @@ void transformInferredJSONTypesIfNeeded(DataTypePtr & first, DataTypePtr & secon
/// 3) Converts all Nothing types to String types if input_format_json_infer_incomplete_types_as_strings is enabled.
void transformFinalInferredJSONTypeIfNeeded(DataTypePtr & data_type, const FormatSettings & settings, JSONInferenceInfo * json_info);
/// Transform types for the same column inferred from different files.
/// Does the same as transformInferredJSONTypesIfNeeded, but also merges named Tuples together,
/// because DataTypeJSONPaths types were finalized when we finished inference for a file.
void transformInferredJSONTypesFromDifferentFilesIfNeeded(DataTypePtr & first, DataTypePtr & second, const FormatSettings & settings);
/// Make type Nullable recursively:
/// - Type -> Nullable(type)
/// - Array(Type) -> Array(Nullable(Type))

View File

@ -34,7 +34,7 @@ namespace
const size_t MAX_DECIMAL256_PRECISION = 76;
const size_t MAX_DEPTH = 16;
constexpr std::array<TypeIndex, 29> simple_types
constexpr std::array<TypeIndex, 28> simple_types
{
TypeIndex::Int8,
TypeIndex::UInt8,
@ -64,7 +64,7 @@ namespace
TypeIndex::Enum16,
TypeIndex::IPv4,
TypeIndex::IPv6,
TypeIndex::UUID,
// TypeIndex::UUID,
};
constexpr std::array<TypeIndex, 5> complex_types
@ -76,7 +76,7 @@ namespace
TypeIndex::Map,
};
constexpr std::array<TypeIndex, 22> map_key_types
constexpr std::array<TypeIndex, 21> map_key_types
{
TypeIndex::Int8,
TypeIndex::UInt8,
@ -98,11 +98,11 @@ namespace
TypeIndex::IPv4,
TypeIndex::Enum8,
TypeIndex::Enum16,
TypeIndex::UUID,
// TypeIndex::UUID,
TypeIndex::LowCardinality,
};
constexpr std::array<TypeIndex, 22> suspicious_lc_types
constexpr std::array<TypeIndex, 21> suspicious_lc_types
{
TypeIndex::Int8,
TypeIndex::UInt8,
@ -125,7 +125,7 @@ namespace
TypeIndex::FixedString,
TypeIndex::IPv4,
TypeIndex::IPv6,
TypeIndex::UUID,
// TypeIndex::UUID,
};
template <bool allow_complex_types>

View File

@ -2,6 +2,7 @@
#include <Formats/SchemaInferenceUtils.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/getLeastSupertype.h>
#include <Common/logger_useful.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <boost/algorithm/string.hpp>
@ -62,6 +63,14 @@ void checkFinalInferredType(
type = removeNullable(type);
}
void ISchemaReader::transformTypesIfNeeded(DB::DataTypePtr & type, DB::DataTypePtr & new_type)
{
DataTypes types = {type, new_type};
auto least_supertype = tryGetLeastSupertype(types);
if (least_supertype)
type = new_type = least_supertype;
}
IIRowSchemaReader::IIRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: ISchemaReader(in_)
, max_rows_to_read(format_settings_.max_rows_to_read_for_schema_inference)
@ -86,11 +95,6 @@ void IIRowSchemaReader::setContext(ContextPtr & context)
}
}
void IIRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type)
{
transformInferredTypesIfNeeded(type, new_type, format_settings);
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IIRowSchemaReader(in_, format_settings_), column_names(splitColumnNames(format_settings.column_names_for_schema_inference))
{

View File

@ -39,6 +39,9 @@ public:
virtual void setMaxRowsAndBytesToRead(size_t, size_t) {}
virtual size_t getNumRowsRead() const { return 0; }
virtual void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type);
virtual void transformTypesFromDifferentFilesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) { transformTypesIfNeeded(type, new_type); }
virtual ~ISchemaReader() = default;
protected:
@ -55,8 +58,6 @@ public:
bool needContext() const override { return !hints_str.empty(); }
void setContext(ContextPtr & context) override;
virtual void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type);
protected:
void setMaxRowsAndBytesToRead(size_t max_rows, size_t max_bytes) override
{

View File

@ -230,6 +230,11 @@ void JSONColumnsSchemaReaderBase::transformTypesIfNeeded(DataTypePtr & type, Dat
transformInferredJSONTypesIfNeeded(type, new_type, format_settings, &inference_info);
}
void JSONColumnsSchemaReaderBase::transformTypesFromDifferentFilesIfNeeded(DataTypePtr & type, DataTypePtr & new_type)
{
transformInferredJSONTypesFromDifferentFilesIfNeeded(type, new_type, format_settings);
}
NamesAndTypesList JSONColumnsSchemaReaderBase::readSchema()
{
std::unordered_map<String, DataTypePtr> names_to_types;

View File

@ -80,7 +80,8 @@ class JSONColumnsSchemaReaderBase : public ISchemaReader
public:
JSONColumnsSchemaReaderBase(ReadBuffer & in_, const FormatSettings & format_settings_, std::unique_ptr<JSONColumnsReaderBase> reader_);
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type);
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
void transformTypesFromDifferentFilesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
bool needContext() const override { return !hints_str.empty(); }
void setContext(ContextPtr & ctx) override;

View File

@ -228,6 +228,11 @@ void JSONCompactEachRowRowSchemaReader::transformTypesIfNeeded(DataTypePtr & typ
transformInferredJSONTypesIfNeeded(type, new_type, format_settings, &inference_info);
}
void JSONCompactEachRowRowSchemaReader::transformTypesFromDifferentFilesIfNeeded(DataTypePtr & type, DataTypePtr & new_type)
{
transformInferredJSONTypesFromDifferentFilesIfNeeded(type, new_type, format_settings);
}
void JSONCompactEachRowRowSchemaReader::transformFinalTypeIfNeeded(DataTypePtr & type)
{
transformFinalInferredJSONTypeIfNeeded(type, format_settings, &inference_info);

View File

@ -92,6 +92,7 @@ private:
std::optional<DataTypes> readRowAndGetDataTypesImpl() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
void transformTypesFromDifferentFilesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
void transformFinalTypeIfNeeded(DataTypePtr & type) override;
JSONCompactEachRowFormatReader reader;

View File

@ -365,6 +365,11 @@ void JSONEachRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTyp
transformInferredJSONTypesIfNeeded(type, new_type, format_settings, &inference_info);
}
void JSONEachRowSchemaReader::transformTypesFromDifferentFilesIfNeeded(DB::DataTypePtr & type, DB::DataTypePtr & new_type)
{
transformInferredJSONTypesFromDifferentFilesIfNeeded(type, new_type, format_settings);
}
void JSONEachRowSchemaReader::transformFinalTypeIfNeeded(DataTypePtr & type)
{
transformFinalInferredJSONTypeIfNeeded(type, format_settings, &inference_info);

View File

@ -104,6 +104,7 @@ public:
private:
NamesAndTypesList readRowAndGetNamesAndDataTypes(bool & eof) override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
void transformTypesFromDifferentFilesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
void transformFinalTypeIfNeeded(DataTypePtr & type) override;
bool first_row = true;

View File

@ -462,6 +462,11 @@ std::optional<DataTypes> MySQLDumpSchemaReader::readRowAndGetDataTypes()
return data_types;
}
void MySQLDumpSchemaReader::transformTypesIfNeeded(DB::DataTypePtr & type, DB::DataTypePtr & new_type)
{
transformInferredTypesIfNeeded(type, new_type, format_settings);
}
void registerInputFormatMySQLDump(FormatFactory & factory)
{
factory.registerInputFormat("MySQLDump", [](

View File

@ -37,6 +37,7 @@ public:
private:
NamesAndTypesList readSchema() override;
std::optional<DataTypes> readRowAndGetDataTypes() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
String table_name;
};

View File

@ -90,7 +90,6 @@ private:
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
using EscapingRule = FormatSettings::EscapingRule;
RegexpFieldExtractor field_extractor;
PeekableReadBuffer buf;

View File

@ -701,6 +701,11 @@ std::optional<DataTypes> ValuesSchemaReader::readRowAndGetDataTypes()
return data_types;
}
void ValuesSchemaReader::transformTypesIfNeeded(DB::DataTypePtr & type, DB::DataTypePtr & new_type)
{
transformInferredTypesIfNeeded(type, new_type, format_settings);
}
void registerInputFormatValues(FormatFactory & factory)
{
factory.registerInputFormat("Values", [](

View File

@ -111,6 +111,7 @@ public:
private:
std::optional<DataTypes> readRowAndGetDataTypes() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
PeekableReadBuffer buf;
ParserExpression parser;

View File

@ -561,5 +561,10 @@ std::vector<String> FormatWithNamesAndTypesSchemaReader::readNamesFromFields(con
return names;
}
void FormatWithNamesAndTypesSchemaReader::transformTypesIfNeeded(DB::DataTypePtr & type, DB::DataTypePtr & new_type)
{
transformInferredTypesIfNeeded(type, new_type, format_settings);
}
}

View File

@ -172,6 +172,8 @@ public:
NamesAndTypesList readSchema() override;
void transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) override;
protected:
virtual std::optional<DataTypes> readRowAndGetDataTypes() override;

View File

@ -29,10 +29,11 @@ public:
String source;
String format;
String additional_format_info;
String schema_inference_mode;
bool operator==(const Key & other) const
{
return source == other.source && format == other.format && additional_format_info == other.additional_format_info;
return source == other.source && format == other.format && additional_format_info == other.additional_format_info && schema_inference_mode == other.schema_inference_mode;
}
};
@ -42,7 +43,7 @@ public:
{
size_t operator()(const Key & key) const
{
return std::hash<String>()(key.source + key.format + key.additional_format_info);
return std::hash<String>()(key.source + key.format + key.additional_format_info + key.schema_inference_mode);
}
};

View File

@ -302,10 +302,17 @@ namespace
{
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
StorageHDFS::PathWithInfo path_with_info;
bool is_first = current_index == 0;
/// For default mode check cached columns for all paths on first iteration.
if (is_first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(paths_with_info))
return {nullptr, cached_columns};
}
StorageHDFS::PathWithInfo path_with_info;
while (true)
{
@ -315,26 +322,33 @@ namespace
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually", format);
return nullptr;
return {nullptr, std::nullopt};
}
path_with_info = paths_with_info[current_index++];
if (getContext()->getSettingsRef().hdfs_skip_empty_files && path_with_info.info && path_with_info.info->size == 0)
continue;
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
std::vector<StorageHDFS::PathWithInfo> paths = {path_with_info};
if (auto cached_columns = tryGetColumnsFromCache(paths))
return {nullptr, cached_columns};
}
auto compression = chooseCompressionMethod(path_with_info.path, compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_with_info.path, getContext()->getGlobalContext()->getConfigRef(), getContext()->getReadSettings());
if (!getContext()->getSettingsRef().hdfs_skip_empty_files || !impl->eof())
{
const Int64 zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
return {wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max)), std::nullopt};
}
}
}
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_hdfs)
return;
String source = uri_without_path + paths_with_info[current_index - 1].path;
@ -342,7 +356,68 @@ namespace
StorageHDFS::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_hdfs
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
String source = uri_without_path + paths_with_info[current_index - 1].path;
auto key = getKeyForSchemaCache(source, format, std::nullopt, getContext());
StorageHDFS::getSchemaCache(getContext()).addColumns(key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_hdfs
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT)
return;
Strings sources;
sources.reserve(paths_with_info.size());
std::transform(paths_with_info.begin(), paths_with_info.end(), std::back_inserter(sources), [&](const StorageHDFS::PathWithInfo & path_with_info){ return uri_without_path + path_with_info.path; });
auto cache_keys = getKeysForSchemaCache(sources, format, {}, getContext());
StorageHDFS::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
}
String getLastFileName() const override
{
if (current_index != 0)
return paths_with_info[current_index - 1].path;
return "";
}
private:
std::optional<ColumnsDescription> tryGetColumnsFromCache(const std::vector<StorageHDFS::PathWithInfo> & paths_with_info_)
{
auto & schema_cache = StorageHDFS::getSchemaCache(getContext());
for (const auto & path_with_info : paths_with_info_)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (path_with_info.info)
return path_with_info.info->last_mod_time;
auto builder = createHDFSBuilder(uri_without_path + "/", getContext()->getGlobalContext()->getConfigRef());
auto fs = createHDFSFS(builder.get());
HDFSFileInfoPtr hdfs_info(hdfsGetPathInfo(fs.get(), path_with_info.path.c_str()));
if (hdfs_info)
return hdfs_info->mLastMod;
return std::nullopt;
};
String url = uri_without_path + path_with_info.path;
auto cache_key = getKeyForSchemaCache(url, format, {}, getContext());
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
const std::vector<StorageHDFS::PathWithInfo> & paths_with_info;
const String & uri_without_path;
const String & format;
@ -366,25 +441,8 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
"Cannot extract table structure from {} format file, because there are no files in HDFS with provided path."
" You must specify table structure manually", format);
std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs)
columns_from_cache = tryGetColumnsFromCache(paths_with_info, uri_without_path, format, ctx);
ColumnsDescription columns;
if (columns_from_cache)
{
columns = *columns_from_cache;
}
else
{
ReadBufferIterator read_buffer_iterator(paths_with_info, uri_without_path, format, compression_method, ctx);
columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths_with_info.size() > 1, ctx);
}
if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs)
addColumnsToCache(paths_with_info, uri_without_path, columns, format, ctx);
return columns;
ReadBufferIterator read_buffer_iterator(paths_with_info, uri_without_path, format, compression_method, ctx);
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths_with_info.size() > 1, ctx);
}
class HDFSSource::DisclosedGlobIterator::Impl
@ -1017,54 +1075,6 @@ SchemaCache & StorageHDFS::getSchemaCache(const ContextPtr & ctx)
return schema_cache;
}
std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(
const std::vector<StorageHDFS::PathWithInfo> & paths_with_info,
const String & uri_without_path,
const String & format_name,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
for (const auto & path_with_info : paths_with_info)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (path_with_info.info)
return path_with_info.info->last_mod_time;
auto builder = createHDFSBuilder(uri_without_path + "/", ctx->getGlobalContext()->getConfigRef());
auto fs = createHDFSFS(builder.get());
HDFSFileInfoPtr hdfs_info(hdfsGetPathInfo(fs.get(), path_with_info.path.c_str()));
if (hdfs_info)
return hdfs_info->mLastMod;
return std::nullopt;
};
String url = uri_without_path + path_with_info.path;
auto cache_key = getKeyForSchemaCache(url, format_name, {}, ctx);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageHDFS::addColumnsToCache(
const std::vector<StorageHDFS::PathWithInfo> & paths_with_info,
const String & uri_without_path,
const ColumnsDescription & columns,
const String & format_name,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
Strings sources;
sources.reserve(paths_with_info.size());
std::transform(paths_with_info.begin(), paths_with_info.end(), std::back_inserter(sources), [&](const PathWithInfo & path_with_info){ return uri_without_path + path_with_info.path; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx);
schema_cache.addManyColumns(cache_keys, columns);
}
}
#endif

View File

@ -94,19 +94,6 @@ protected:
friend class HDFSSource;
private:
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const std::vector<StorageHDFS::PathWithInfo> & paths_with_info,
const String & uri_without_path,
const String & format_name,
const ContextPtr & ctx);
static void addColumnsToCache(
const std::vector<StorageHDFS::PathWithInfo> & paths,
const String & uri_without_path,
const ColumnsDescription & columns,
const String & format_name,
const ContextPtr & ctx);
std::vector<String> uris;
String format_name;
String compression_method;

View File

@ -1215,11 +1215,18 @@ namespace
{
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
auto [key, metadata] = file_iterator->next();
/// For default mode check cached columns for currently read keys on first iteration.
if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns};
}
if (key.empty())
current_path_with_metadata = file_iterator->next();
if (current_path_with_metadata.relative_path.empty())
{
if (first)
throw Exception(
@ -1227,49 +1234,102 @@ namespace
"Cannot extract table structure from {} format file, because there are no files with provided path "
"in AzureBlobStorage. You must specify table structure manually", configuration.format);
return nullptr;
return {nullptr, std::nullopt};
}
current_path = key;
first = false;
///AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache.
if (getContext()->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size)
/// AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = StorageAzureBlob::tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, getContext());
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
return nullptr;
return {nullptr, columns_from_cache};
}
else if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
RelativePathsWithMetadata paths = {current_path_with_metadata};
if (auto columns_from_cache = tryGetColumnsFromCache(paths.begin(), paths.end()))
return {nullptr, columns_from_cache};
}
first = false;
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
return wrapReadBufferWithCompressionMethod(
object_storage->readObject(StoredObject(key), getContext()->getReadSettings(), {}, metadata.size_bytes),
chooseCompressionMethod(key, configuration.compression_method),
zstd_window_log_max);
return {wrapReadBufferWithCompressionMethod(
object_storage->readObject(StoredObject(current_path_with_metadata.relative_path), getContext()->getReadSettings(), {}, current_path_with_metadata.metadata.size_bytes),
chooseCompressionMethod(current_path_with_metadata.relative_path, configuration.compression_method),
zstd_window_log_max), std::nullopt};
}
std::optional<ColumnsDescription> getCachedColumns() override { return columns_from_cache; }
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_azure)
return;
String source = fs::path(configuration.connection_url) / configuration.container / current_path;
String source = fs::path(configuration.connection_url) / configuration.container / current_path_with_metadata.relative_path;
auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
StorageAzureBlob::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_azure
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
String source = fs::path(configuration.connection_url) / configuration.container / current_path_with_metadata.relative_path;
auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
StorageAzureBlob::getSchemaCache(getContext()).addColumns(key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_azure
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
auto host_and_bucket = configuration.connection_url + '/' + configuration.container;
Strings sources;
sources.reserve(read_keys.size());
std::transform(read_keys.begin(), read_keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket + '/' + elem.relative_path; });
auto cache_keys = getKeysForSchemaCache(sources, configuration.format, format_settings, getContext());
StorageAzureBlob::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
}
String getLastFileName() const override { return current_path_with_metadata.relative_path; }
private:
std::optional<ColumnsDescription> tryGetColumnsFromCache(const RelativePathsWithMetadata::const_iterator & begin, const RelativePathsWithMetadata::const_iterator & end)
{
auto & schema_cache = StorageAzureBlob::getSchemaCache(getContext());
for (auto it = begin; it < end; ++it)
{
auto get_last_mod_time = [&] -> std::optional<time_t>
{
if (it->metadata.last_modified)
return it->metadata.last_modified->epochTime();
return std::nullopt;
};
auto host_and_bucket = configuration.connection_url + '/' + configuration.container;
String source = host_and_bucket + '/' + it->relative_path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
std::shared_ptr<StorageAzureBlobSource::IIterator> file_iterator;
AzureObjectStorage * object_storage;
const StorageAzureBlob::Configuration & configuration;
const std::optional<FormatSettings> & format_settings;
const RelativePathsWithMetadata & read_keys;
std::optional<ColumnsDescription> columns_from_cache;
size_t prev_read_keys_size;
String current_path;
RelativePathWithMetadata current_path_with_metadata;
bool first = true;
};
}
@ -1299,72 +1359,8 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
object_storage, configuration.container, configuration.blobs_paths, nullptr, NamesAndTypesList{}, ctx, &read_keys);
}
std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure)
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), configuration, format_settings, ctx);
ColumnsDescription columns;
if (columns_from_cache)
{
columns = *columns_from_cache;
}
else
{
ReadBufferIterator read_buffer_iterator(file_iterator, object_storage, configuration, format_settings, read_keys, ctx);
columns = readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx);
}
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure)
addColumnsToCache(read_keys, columns, configuration, format_settings, configuration.format, ctx);
return columns;
}
std::optional<ColumnsDescription> StorageAzureBlob::tryGetColumnsFromCache(
const RelativePathsWithMetadata::const_iterator & begin,
const RelativePathsWithMetadata::const_iterator & end,
const StorageAzureBlob::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it)
{
auto get_last_mod_time = [&] -> std::optional<time_t>
{
if (it->metadata.last_modified)
return it->metadata.last_modified->epochTime();
return std::nullopt;
};
auto host_and_bucket = configuration.connection_url + '/' + configuration.container;
String source = host_and_bucket + '/' + it->relative_path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageAzureBlob::addColumnsToCache(
const RelativePathsWithMetadata & keys,
const ColumnsDescription & columns,
const StorageAzureBlob::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const String & format_name,
const ContextPtr & ctx)
{
auto host_and_bucket = configuration.connection_url + '/' + configuration.container;
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket + '/' + elem.relative_path; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addManyColumns(cache_keys, columns);
ReadBufferIterator read_buffer_iterator(file_iterator, object_storage, configuration, format_settings, read_keys, ctx);
return readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx);
}
SchemaCache & StorageAzureBlob::getSchemaCache(const ContextPtr & ctx)

View File

@ -124,21 +124,6 @@ public:
ContextPtr ctx,
bool distributed_processing = false);
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const RelativePathsWithMetadata::const_iterator & begin,
const RelativePathsWithMetadata::const_iterator & end,
const StorageAzureBlob::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
static void addColumnsToCache(
const RelativePathsWithMetadata & keys,
const ColumnsDescription & columns,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const String & format_name,
const ContextPtr & ctx);
private:
std::string name;
Configuration configuration;

View File

@ -444,11 +444,19 @@ namespace
{
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
bool is_first = current_index == 0;
/// For default mode check cached columns for all paths on first iteration.
/// If we have cached columns, next() won't be called again.
if (is_first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(paths))
return {nullptr, cached_columns};
}
String path;
struct stat file_stat;
bool is_first = current_index == 0;
do
{
@ -459,14 +467,21 @@ namespace
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
return {nullptr, std::nullopt};
}
path = paths[current_index++];
file_stat = getFileStat(path, false, -1, "File");
} while (getContext()->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
return createReadBuffer(path, file_stat, false, -1, compression_method, getContext());
/// For union mode, check cached columns only for current path, because schema can be different for different files.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
if (auto cached_columns = tryGetColumnsFromCache({path}))
return {nullptr, cached_columns};
}
return {createReadBuffer(path, file_stat, false, -1, compression_method, getContext()), std::nullopt};
}
void setNumRowsToLastFile(size_t num_rows) override
@ -478,7 +493,64 @@ namespace
StorageFile::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
/// For union mode, schema can be different for different files, so we need to
/// cache last inferred schema only for last processed file.
auto cache_key = getKeyForSchemaCache(paths[current_index - 1], format, format_settings, getContext());
StorageFile::getSchemaCache(getContext()).addColumns(cache_key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT)
return;
/// For default mode we cache resulting schema for all paths.
auto cache_keys = getKeysForSchemaCache(paths, format, format_settings, getContext());
StorageFile::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
}
String getLastFileName() const override
{
if (current_index != 0)
return paths[current_index - 1];
return "";
}
private:
std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & paths_)
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file)
return std::nullopt;
/// Check if the cache contains one of the paths.
auto & schema_cache = StorageFile::getSchemaCache(getContext());
struct stat file_stat{};
for (const auto & path : paths_)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(path.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(path, format, format_settings, getContext());
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
const std::vector<String> & paths;
size_t current_index = 0;
@ -502,8 +574,19 @@ namespace
{
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
/// For default mode check cached columns for all initial archive paths (maybe with globs) on first iteration.
/// If we have cached columns, next() won't be called again.
if (is_first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
for (const auto & archive : archive_info.paths_to_archives)
{
if (auto cached_columns = tryGetColumnsFromSchemaCache(archive, archive_info.path_in_archive))
return {nullptr, cached_columns};
}
}
std::unique_ptr<ReadBuffer> read_buf;
while (true)
{
@ -515,7 +598,7 @@ namespace
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
return {nullptr, std::nullopt};
}
const auto & archive = archive_info.paths_to_archives[current_archive_index];
@ -546,11 +629,11 @@ namespace
if (!read_buf)
continue;
last_read_file_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), archive_info.path_in_archive));
columns_from_cache = tryGetColumnsFromSchemaCache(archive, last_read_file_path);
last_read_file_path = paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), archive_info.path_in_archive));
is_first = false;
if (columns_from_cache)
return nullptr;
if (auto cached_columns = tryGetColumnsFromSchemaCache(archive, last_read_file_path))
return {nullptr, cached_columns};
}
else
{
@ -583,11 +666,17 @@ namespace
continue;
}
last_read_file_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), *filename));
columns_from_cache = tryGetColumnsFromSchemaCache(archive, last_read_file_path);
last_read_file_path = paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), *filename));
is_first = false;
if (columns_from_cache)
return nullptr;
if (auto cached_columns = tryGetColumnsFromSchemaCache(archive, last_read_file_path))
{
/// For union mode next() will be called again even if we found cached columns,
/// so we need to remember last_read_buffer to continue iterating through files in archive.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
last_read_buffer = archive_reader->readFile(std::move(file_enumerator));
return {nullptr, cached_columns};
}
read_buf = archive_reader->readFile(std::move(file_enumerator));
}
@ -595,18 +684,13 @@ namespace
break;
}
is_first = false;
return read_buf;
}
std::optional<ColumnsDescription> getCachedColumns() override
{
return columns_from_cache;
return {std::move(read_buf), std::nullopt};
}
void setPreviousReadBuffer(std::unique_ptr<ReadBuffer> buffer) override
{
last_read_buffer = std::move(buffer);
if (buffer)
last_read_buffer = std::move(buffer);
}
void setNumRowsToLastFile(size_t num_rows) override
@ -618,13 +702,45 @@ namespace
StorageFile::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
std::vector<std::string> processed_files;
private:
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
/// For union mode, schema can be different for different files in archive, so we need to
/// cache last inferred schema only for last processed file.
auto & schema_cache = StorageFile::getSchemaCache(getContext());
auto cache_key = getKeyForSchemaCache(last_read_file_path, format, format_settings, getContext());
schema_cache.addColumns(cache_key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT)
return;
/// For default mode we cache resulting schema for all paths.
/// Also add schema for initial paths (maybe with globes) in cache,
/// so next time we won't iterate through files (that can be expensive).
for (const auto & archive : archive_info.paths_to_archives)
paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive, archive_info.path_in_archive));
auto & schema_cache = StorageFile::getSchemaCache(getContext());
auto cache_keys = getKeysForSchemaCache(paths_for_schema_cache, format, format_settings, getContext());
schema_cache.addManyColumns(cache_keys, columns);
}
String getLastFileName() const override
{
return last_read_file_path;
}
private:
std::optional<ColumnsDescription> tryGetColumnsFromSchemaCache(const std::string & archive_path, const std::string & full_path)
{
auto context = getContext();
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file)
if (!context->getSettingsRef().schema_inference_use_cache_for_file)
return std::nullopt;
struct stat file_stat;
@ -654,44 +770,13 @@ namespace
std::string last_read_file_path;
std::optional<ColumnsDescription> columns_from_cache;
std::unique_ptr<IArchiveReader::FileEnumerator> file_enumerator;
std::unique_ptr<ReadBuffer> last_read_buffer;
String format;
const std::optional<FormatSettings> & format_settings;
std::vector<std::string> paths_for_schema_cache;
};
std::optional<ColumnsDescription> tryGetColumnsFromCacheForArchives(
const StorageFile::ArchiveInfo & archive_info,
std::vector<std::string> & paths_for_schema_cache,
const String & format,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
struct stat file_stat{};
std::optional<ColumnsDescription> columns_from_cache;
for (const auto & archive : archive_info.paths_to_archives)
{
const auto & full_path = paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive, archive_info.path_in_archive));
auto & schema_cache = StorageFile::getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
columns_from_cache = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
}
return columns_from_cache;
}
}
ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr context)
@ -744,48 +829,19 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
"Cannot extract table structure from {} format file, because there are no files with provided path. "
"You must specify table structure manually", format);
ColumnsDescription columns;
std::vector<std::string> archive_paths_for_schema_cache;
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file)
if (archive_info)
{
if (archive_info)
columns_from_cache = tryGetColumnsFromCacheForArchives(*archive_info, archive_paths_for_schema_cache, format, format_settings, context);
else
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
return readSchemaFromFormat(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->isSingleFileRead(),
context);
}
if (columns_from_cache)
{
columns = std::move(*columns_from_cache);
}
else
{
if (archive_info)
{
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
columns = readSchemaFromFormat(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->isSingleFileRead(),
context);
for (auto & file : read_buffer_iterator.processed_files)
archive_paths_for_schema_cache.push_back(std::move(file));
}
else
{
ReadBufferFromFileIterator read_buffer_iterator(paths, format, compression_method, format_settings, context);
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
}
}
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(archive_info.has_value() ? archive_paths_for_schema_cache : paths, columns, format, format_settings, context);
return columns;
ReadBufferFromFileIterator read_buffer_iterator(paths, format, compression_method, format_settings, context);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
}
bool StorageFile::supportsSubsetOfColumns(const ContextPtr & context) const
@ -1972,43 +2028,6 @@ SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
return schema_cache;
}
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context)
{
/// Check if the cache contains one of the paths.
auto & schema_cache = getSchemaCache(context);
struct stat file_stat{};
for (const auto & path : paths)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(path.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageFile::addColumnsToCache(
const Strings & paths,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
auto cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
schema_cache.addManyColumns(cache_keys, columns);
}
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive)
{
size_t pos = source.find("::");

View File

@ -126,16 +126,6 @@ protected:
private:
void setStorageMetadata(CommonArguments args);
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context);
static void addColumnsToCache(
const Strings & paths,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
std::string format_name;
// We use format settings from global context + CREATE query for File table
// function -- in this case, format_settings is set.

View File

@ -1499,8 +1499,15 @@ namespace
{
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
/// For default mode check cached columns for currently read keys on first iteration.
if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns};
}
while (true)
{
current_key_with_info = (*file_iterator)();
@ -1514,36 +1521,42 @@ namespace
"in S3 or all files are empty. You must specify table structure manually",
configuration.format);
return nullptr;
return {nullptr, std::nullopt};
}
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
if (getContext()->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size)
/// S3 file iterator could get new keys after new iteration, check them in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = StorageS3::tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, getContext());
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
return nullptr;
return {nullptr, columns_from_cache};
}
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info.info && current_key_with_info.info->size == 0)
continue;
/// In union mode, check cached columns only for current key.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
StorageS3::KeysWithInfo keys = {current_key_with_info};
if (auto columns_from_cache = tryGetColumnsFromCache(keys.begin(), keys.end()))
{
first = false;
return {nullptr, columns_from_cache};
}
}
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info.key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof())
{
first = false;
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info.key, configuration.compression_method), zstd_window_log_max);
return {wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info.key, configuration.compression_method), zstd_window_log_max), std::nullopt};
}
}
}
std::optional<ColumnsDescription> getCachedColumns() override
{
return columns_from_cache;
}
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
@ -1554,12 +1567,85 @@ namespace
StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info.key;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addColumns(cache_key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT)
return;
auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket;
Strings sources;
sources.reserve(read_keys.size());
std::transform(read_keys.begin(), read_keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem.key; });
auto cache_keys = getKeysForSchemaCache(sources, configuration.format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
}
String getLastFileName() const override { return current_key_with_info.key; }
private:
std::optional<ColumnsDescription> tryGetColumnsFromCache(
const StorageS3::KeysWithInfo::const_iterator & begin,
const StorageS3::KeysWithInfo::const_iterator & end)
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
return std::nullopt;
auto & schema_cache = StorageS3::getSchemaCache(getContext());
for (auto it = begin; it < end; ++it)
{
auto get_last_mod_time = [&]
{
time_t last_modification_time = 0;
if (it->info)
{
last_modification_time = it->info->last_modification_time;
}
else
{
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
last_modification_time = S3::getObjectInfo(
*configuration.client,
configuration.url.bucket,
it->key,
configuration.url.version_id,
configuration.request_settings,
/*with_metadata=*/ false,
/*for_disk_s3=*/ false,
/*throw_on_error= */ false).last_modification_time;
}
return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt;
};
String path = fs::path(configuration.url.bucket) / it->key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
std::shared_ptr<StorageS3Source::IIterator> file_iterator;
const StorageS3Source::KeysWithInfo & read_keys;
const StorageS3::Configuration & configuration;
const std::optional<FormatSettings> & format_settings;
std::optional<ColumnsDescription> columns_from_cache;
StorageS3Source::KeyWithInfo current_key_with_info;
size_t prev_read_keys_size;
bool first = true;
@ -1576,28 +1662,10 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
auto file_iterator = createFileIterator(configuration, false, ctx, nullptr, {}, &read_keys);
std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), configuration, format_settings, ctx);
ColumnsDescription columns;
if (columns_from_cache)
{
columns = *columns_from_cache;
}
else
{
ReadBufferIterator read_buffer_iterator(file_iterator, read_keys, configuration, format_settings, ctx);
columns = readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx);
}
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)
addColumnsToCache(read_keys, configuration, columns, configuration.format, format_settings, ctx);
return columns;
ReadBufferIterator read_buffer_iterator(file_iterator, read_keys, configuration, format_settings, ctx);
return readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx);
}
void registerStorageS3Impl(const String & name, StorageFactory & factory)
{
factory.registerStorage(name, [](const StorageFactory::Arguments & args)
@ -1687,70 +1755,6 @@ SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx)
return schema_cache;
}
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
const KeysWithInfo::const_iterator & begin,
const KeysWithInfo::const_iterator & end,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it)
{
auto get_last_mod_time = [&]
{
time_t last_modification_time = 0;
if (it->info)
{
last_modification_time = it->info->last_modification_time;
}
else
{
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
last_modification_time = S3::getObjectInfo(
*configuration.client,
configuration.url.bucket,
it->key,
configuration.url.version_id,
configuration.request_settings,
/*with_metadata=*/ false,
/*for_disk_s3=*/ false,
/*throw_on_error= */ false).last_modification_time;
}
return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt;
};
String path = fs::path(configuration.url.bucket) / it->key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageS3::addColumnsToCache(
const KeysWithInfo & keys,
const Configuration & configuration,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket;
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem.key; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addManyColumns(cache_keys, columns);
}
}
#endif

View File

@ -344,21 +344,6 @@ public:
using KeysWithInfo = StorageS3Source::KeysWithInfo;
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const KeysWithInfo::const_iterator & begin,
const KeysWithInfo::const_iterator & end,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
static void addColumnsToCache(
const KeysWithInfo & keys,
const Configuration & configuration,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
bool supportsTrivialCountOptimization() const override { return true; }
protected:

View File

@ -708,30 +708,53 @@ namespace
const HTTPHeaderEntries & headers_,
const std::optional<FormatSettings> & format_settings_,
const ContextPtr & context_)
: WithContext(context_), urls_to_check(urls_to_check_), format(format_), compression_method(compression_method_), headers(headers_), format_settings(format_settings_)
: WithContext(context_), format(format_), compression_method(compression_method_), headers(headers_), format_settings(format_settings_)
{
it = urls_to_check.cbegin();
url_options_to_check.reserve(urls_to_check_.size());
for (const auto & url : urls_to_check_)
url_options_to_check.push_back(getFailoverOptions(url, getContext()->getSettingsRef().glob_expansion_max_elements));
}
std::unique_ptr<ReadBuffer> next() override
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override
{
bool is_first = (current_index == 0);
/// For default mode check cached columns for all urls on first iteration.
if (is_first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
for (const auto & options : url_options_to_check)
{
if (auto cached_columns = tryGetColumnsFromCache(options))
return {nullptr, cached_columns};
}
}
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> uri_and_buf;
do
{
if (it == urls_to_check.cend())
if (current_index == url_options_to_check.size())
{
if (first)
if (is_first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually",
format);
return nullptr;
return {nullptr, std::nullopt};
}
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
if (auto cached_columns = tryGetColumnsFromCache(url_options_to_check[current_index]))
{
++current_index;
return {nullptr, cached_columns};
}
}
auto first_option = url_options_to_check[current_index].cbegin();
uri_and_buf = StorageURLSource::getFirstAvailableURIAndReadBuffer(
it,
urls_to_check.cend(),
first_option,
url_options_to_check[current_index].cend(),
getContext(),
{},
Poco::Net::HTTPRequest::HTTP_GET,
@ -742,35 +765,87 @@ namespace
false,
false);
++it;
++current_index;
} while (getContext()->getSettingsRef().engine_url_skip_empty_files && uri_and_buf.second->eof());
first = false;
return wrapReadBufferWithCompressionMethod(
current_url_option = uri_and_buf.first.toString();
return {wrapReadBufferWithCompressionMethod(
std::move(uri_and_buf.second),
compression_method,
static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max));
static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max)), std::nullopt};
}
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_url)
return;
String source = *std::prev(it);
auto key = getKeyForSchemaCache(source, format, format_settings, getContext());
auto key = getKeyForSchemaCache(current_url_option, format, format_settings, getContext());
StorageURL::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_url
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
auto key = getKeyForSchemaCache(current_url_option, format, format_settings, getContext());
StorageURL::getSchemaCache(getContext()).addColumns(key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_url
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT)
return;
for (const auto & options : url_options_to_check)
{
auto keys = getKeysForSchemaCache(options, format, format_settings, getContext());
StorageURL::getSchemaCache(getContext()).addManyColumns(keys, columns);
}
}
String getLastFileName() const override { return current_url_option; }
private:
const std::vector<String> & urls_to_check;
std::vector<String>::const_iterator it;
std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & urls)
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_url)
return std::nullopt;
auto & schema_cache = StorageURL::getSchemaCache(getContext());
for (const auto & url : urls)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
auto last_mod_time = StorageURL::tryGetLastModificationTime(url, headers, credentials, getContext());
/// Some URLs could not have Last-Modified header, in this case we cannot be sure that
/// data wasn't changed after adding it's schema to cache. Use schema from cache only if
/// special setting for this case is enabled.
if (!last_mod_time && !getContext()->getSettingsRef().schema_inference_cache_require_modification_time_for_url)
return 0;
return last_mod_time;
};
auto cache_key = getKeyForSchemaCache(url, format, format_settings, getContext());
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
std::vector<std::vector<String>> url_options_to_check;
size_t current_index = 0;
String current_url_option;
const String & format;
const CompressionMethod & compression_method;
const HTTPHeaderEntries & headers;
Poco::Net::HTTPBasicCredentials credentials;
const std::optional<FormatSettings> & format_settings;
bool first = true;
};
}
@ -788,39 +863,12 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
std::vector<String> urls_to_check;
if (urlWithGlobs(uri))
{
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses, "url");
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses, "url");
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
}
urls_to_check = parseRemoteDescription(uri, 0, uri.size(), ',', context->getSettingsRef().glob_expansion_max_elements, "url");
else
{
urls_to_check = {uri};
}
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_url)
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, credentials, format, format_settings, context);
ColumnsDescription columns;
if (columns_from_cache)
{
columns = *columns_from_cache;
}
else
{
ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
}
if (context->getSettingsRef().schema_inference_use_cache_for_url)
addColumnsToCache(urls_to_check, columns, format, format_settings, context);
return columns;
ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
}
bool IStorageURLBase::supportsSubsetOfColumns(const ContextPtr & context) const
@ -1033,49 +1081,6 @@ SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
return schema_cache;
}
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
const Strings & urls,
const HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
for (const auto & url : urls)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
auto last_mod_time = tryGetLastModificationTime(url, headers, credentials, context);
/// Some URLs could not have Last-Modified header, in this case we cannot be sure that
/// data wasn't changed after adding it's schema to cache. Use schema from cache only if
/// special setting for this case is enabled.
if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url)
return 0;
return last_mod_time;
};
auto cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void IStorageURLBase::addColumnsToCache(
const Strings & urls,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
auto cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
schema_cache.addManyColumns(cache_keys, columns);
}
std::optional<time_t> IStorageURLBase::tryGetLastModificationTime(
const String & url,
const HTTPHeaderEntries & headers,

View File

@ -124,21 +124,6 @@ protected:
private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & urls,
const HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
static void addColumnsToCache(
const Strings & urls,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
};

View File

@ -40,7 +40,8 @@ NamesAndTypesList StorageSystemSchemaInferenceCache::getNamesAndTypes()
{"additional_format_info", std::make_shared<DataTypeString>()},
{"registration_time", std::make_shared<DataTypeDateTime>()},
{"schema", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
{"number_of_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())}
{"number_of_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
{"schema_inference_mode", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>())},
};
}
@ -64,6 +65,7 @@ static void fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cach
res_columns[6]->insert(*schema_info.num_rows);
else
res_columns[6]->insertDefault();
res_columns[7]->insert(key.schema_inference_mode);
}
}

View File

@ -1015,3 +1015,60 @@ def test_filtering_by_file_or_path(cluster):
)
assert int(result) == 1
def test_union_schema_inference_mode(cluster):
node = cluster.instances["node"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference1.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') select 1 as a",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference2.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') select 2 as b",
)
node.query("system drop schema cache for azure")
result = azure_query(
node,
"desc azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference*.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') settings schema_inference_mode='union', describe_compact_output=1 format TSV",
)
assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\n"
result = node.query(
"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
)
assert (
result == "UNION\ttest_union_schema_inference1.jsonl\ta Nullable(Int64)\n"
"UNION\ttest_union_schema_inference2.jsonl\tb Nullable(Int64)\n"
)
result = azure_query(
node,
"select * from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference*.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV",
)
assert result == "1\t\\N\n" "\\N\t2\n"
node.query(f"system drop schema cache for hdfs")
result = azure_query(
node,
"desc azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference2.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') settings schema_inference_mode='union', describe_compact_output=1 format TSV",
)
assert result == "b\tNullable(Int64)\n"
result = azure_query(
node,
"desc azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference*.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') settings schema_inference_mode='union', describe_compact_output=1 format TSV",
)
assert result == "a\tNullable(Int64)\n" "b\tNullable(Int64)\n"
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference3.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', TSV) select 'Error'",
)
error = azure_query(
node,
"desc azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_union_schema_inference*.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') settings schema_inference_mode='union', describe_compact_output=1 format TSV",
expect_error="true",
)
assert "Cannot extract table structure" in error

View File

@ -998,6 +998,55 @@ def test_read_subcolumns(started_cluster):
)
def test_union_schema_inference_mode(started_cluster):
node = started_cluster.instances["node1"]
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference1.jsonl') select 1 as a"
)
node.query(
"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference2.jsonl') select 2 as b"
)
node.query("system drop schema cache for hdfs")
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\n"
result = node.query(
"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
)
assert (
result == "UNION\ttest_union_schema_inference1.jsonl\ta Nullable(Int64)\n"
"UNION\ttest_union_schema_inference2.jsonl\tb Nullable(Int64)\n"
)
result = node.query(
"select * from hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "1\t\\N\n" "\\N\t2\n"
node.query(f"system drop schema cache for hdfs")
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "b\tNullable(Int64)\n"
result = node.query(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\n" "b\tNullable(Int64)\n"
node.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_union_schema_inference3.jsonl', TSV) select 'Error'"
)
error = node.query_and_get_error(
"desc hdfs('hdfs://hdfs1:9000/test_union_schema_inference*.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert "Cannot extract table structure" in error
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -2072,3 +2072,65 @@ def test_filtering_by_file_or_path(started_cluster):
)
assert int(result) == 1
def test_union_schema_inference_mode(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["s3_non_default"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference1.jsonl') select 1 as a"
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference2.jsonl') select 2 as b"
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference3.jsonl') select 2 as c"
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference4.jsonl', TSV) select 'Error'"
)
for engine in ["s3", "url"]:
instance.query("system drop schema cache for s3")
result = instance.query(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\nc\tNullable(Int64)\n"
result = instance.query(
"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV"
)
assert (
result == "UNION\ttest_union_schema_inference1.jsonl\ta Nullable(Int64)\n"
"UNION\ttest_union_schema_inference2.jsonl\tb Nullable(Int64)\n"
"UNION\ttest_union_schema_inference3.jsonl\tc Nullable(Int64)\n"
)
result = instance.query(
f"select * from {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3}}.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "1\t\\N\t\\N\n" "\\N\t2\t\\N\n" "\\N\t\\N\t2\n"
instance.query(f"system drop schema cache for {engine}")
result = instance.query(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert result == "b\tNullable(Int64)\n"
result = instance.query(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert (
result == "a\tNullable(Int64)\n"
"b\tNullable(Int64)\n"
"c\tNullable(Int64)\n"
)
error = instance.query_and_get_error(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3,4}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
)
assert "Cannot extract table structure" in error

View File

@ -0,0 +1,33 @@
c Nullable(String)
b Nullable(Int64)
obj Tuple(f2 Nullable(String), f3 Nullable(Int64), f1 Nullable(Int64))
a Nullable(Int64)
{"c":"hello","b":null,"obj":{"f2":null,"f3":null,"f1":null},"a":null}
{"c":null,"b":"2","obj":{"f2":"Some string","f3":"2","f1":null},"a":null}
{"c":null,"b":null,"obj":{"f2":"2020-01-01","f3":null,"f1":"1"},"a":"1"}
UNION data1.jsonl a Nullable(Int64), obj Tuple(f1 Nullable(Int64), f2 Nullable(Date))
UNION data2.jsonl b Nullable(Int64), obj Tuple(f2 Nullable(String), f3 Nullable(Int64))
UNION data3.jsonl c Nullable(String)
c Nullable(String)
c Nullable(String)
b Nullable(Int64)
obj Tuple(f2 Nullable(String), f3 Nullable(Int64), f1 Nullable(Int64))
a Nullable(Int64)
a Nullable(Int64)
obj Tuple(f1 Nullable(Int64), f2 Nullable(String), f3 Nullable(Int64))
b Nullable(Int64)
c Nullable(String)
{"a":"1","obj":{"f1":"1","f2":"2020-01-01","f3":null},"b":null,"c":null}
{"a":null,"obj":{"f1":null,"f2":"Some string","f3":"2"},"b":"2","c":null}
{"a":null,"obj":{"f1":null,"f2":null,"f3":null},"b":null,"c":"hello"}
UNION archive.tar::data1.jsonl a Nullable(Int64), obj Tuple(f1 Nullable(Int64), f2 Nullable(Date))
UNION archive.tar::data2.jsonl b Nullable(Int64), obj Tuple(f2 Nullable(String), f3 Nullable(Int64))
UNION archive.tar::data3.jsonl c Nullable(String)
c Nullable(String)
a Nullable(Int64)
obj Tuple(f1 Nullable(Int64), f2 Nullable(String), f3 Nullable(Int64))
b Nullable(Int64)
c Nullable(String)
1
1
1

View File

@ -0,0 +1,57 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-msan, no-ubsan
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
mkdir -p $CLICKHOUSE_TEST_UNIQUE_NAME
rm -rf $CLICKHOUSE_TEST_UNIQUE_NAME/*
echo '{"a" : 1, "obj" : {"f1" : 1, "f2" : "2020-01-01"}}' > $CLICKHOUSE_TEST_UNIQUE_NAME/data1.jsonl
echo '{"b" : 2, "obj" : {"f3" : 2, "f2" : "Some string"}}' > $CLICKHOUSE_TEST_UNIQUE_NAME/data2.jsonl
echo '{"c" : "hello"}' > $CLICKHOUSE_TEST_UNIQUE_NAME/data3.jsonl
$CLICKHOUSE_LOCAL -nm -q "
set schema_inference_mode = 'union';
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data*.jsonl');
select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME/data*.jsonl') order by tuple(*) format JSONEachRow;
select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file;
"
$CLICKHOUSE_LOCAL -nm -q "
set schema_inference_mode = 'union';
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data3.jsonl');
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data*.jsonl');
"
cd $CLICKHOUSE_TEST_UNIQUE_NAME/ && tar -cf archive.tar data1.jsonl data2.jsonl data3.jsonl && cd ..
$CLICKHOUSE_LOCAL -nm -q "
set schema_inference_mode = 'union';
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/archive.tar :: data*.jsonl');
select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME/archive.tar :: data*.jsonl') order by tuple(*) format JSONEachRow;
select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file;
"
$CLICKHOUSE_LOCAL -nm -q "
set schema_inference_mode = 'union';
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/archive.tar :: data3.jsonl');
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/archive.tar :: data*.jsonl');
"
echo 'Error' > $CLICKHOUSE_TEST_UNIQUE_NAME/data4.jsonl
$CLICKHOUSE_LOCAL -q "desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data*.jsonl') settings schema_inference_mode='union'" 2>&1 | grep -c -F "Cannot extract table structure"
$CLICKHOUSE_LOCAL -nm -q "
set schema_inference_mode = 'union';
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data{2,3}.jsonl');
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data*.jsonl');
" 2>&1 | grep -c -F "Cannot extract table structure"
echo 42 > $CLICKHOUSE_TEST_UNIQUE_NAME/data1.csv
echo 42, 43 > $CLICKHOUSE_TEST_UNIQUE_NAME/data2.csv
$CLICKHOUSE_LOCAL -q "desc file('$CLICKHOUSE_TEST_UNIQUE_NAME/data*.csv') settings schema_inference_mode='union'" 2>&1 | grep -c -F "BAD_ARGUMENTS";
rm -rf $CLICKHOUSE_TEST_UNIQUE_NAME