Merge pull request #40414 from Avogar/improve-schema-inference-cache

Improve schema inference cache, respect format settings that can change the schema
This commit is contained in:
Kruglov Pavel 2022-08-23 17:04:58 +02:00 committed by GitHub
commit 72f02bd6eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 370 additions and 126 deletions

View File

@ -535,7 +535,7 @@ void SerializationArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr,
readCSV(s, istr, settings.csv);
ReadBufferFromString rb(s);
if (settings.csv.input_format_arrays_as_nested_csv)
if (settings.csv.arrays_as_nested_csv)
{
deserializeTextImpl(column, rb,
[&](IColumn & nested_column)

View File

@ -24,7 +24,7 @@ void SerializationEnum<Type>::serializeTextEscaped(const IColumn & column, size_
template <typename Type>
void SerializationEnum<Type>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (settings.tsv.input_format_enum_as_number)
if (settings.tsv.enum_as_number)
assert_cast<ColumnType &>(column).getData().push_back(readValue(istr));
else
{
@ -52,7 +52,7 @@ void SerializationEnum<Type>::deserializeTextQuoted(IColumn & column, ReadBuffer
template <typename Type>
void SerializationEnum<Type>::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (settings.tsv.input_format_enum_as_number)
if (settings.tsv.enum_as_number)
{
assert_cast<ColumnType &>(column).getData().push_back(readValue(istr));
if (!istr.eof())
@ -100,7 +100,7 @@ void SerializationEnum<Type>::serializeTextCSV(const IColumn & column, size_t ro
template <typename Type>
void SerializationEnum<Type>::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (settings.csv.input_format_enum_as_number)
if (settings.csv.enum_as_number)
assert_cast<ColumnType &>(column).getData().push_back(readValue(istr));
else
{

View File

@ -697,7 +697,7 @@ DataTypePtr determineDataTypeByEscapingRule(const String & field, const FormatSe
return JSONUtils::getDataTypeFromField(field, format_settings);
case FormatSettings::EscapingRule::CSV:
{
if (!format_settings.csv.input_format_use_best_effort_in_schema_inference)
if (!format_settings.csv.use_best_effort_in_schema_inference)
return makeNullable(std::make_shared<DataTypeString>());
if (field.empty() || field == format_settings.csv.null_representation)
@ -745,7 +745,7 @@ DataTypePtr determineDataTypeByEscapingRule(const String & field, const FormatSe
case FormatSettings::EscapingRule::Raw: [[fallthrough]];
case FormatSettings::EscapingRule::Escaped:
{
if (!format_settings.tsv.input_format_use_best_effort_in_schema_inference)
if (!format_settings.tsv.use_best_effort_in_schema_inference)
return makeNullable(std::make_shared<DataTypeString>());
if (field.empty() || field == format_settings.tsv.null_representation)
@ -799,4 +799,49 @@ DataTypes getDefaultDataTypeForEscapingRules(const std::vector<FormatSettings::E
return data_types;
}
String getAdditionalFormatInfoByEscapingRule(const FormatSettings & settings, FormatSettings::EscapingRule escaping_rule)
{
String result;
/// First, settings that are common for all text formats:
result = fmt::format(
"schema_inference_hints={}, try_infer_integers={}, try_infer_dates={}, try_infer_datetimes={}, max_rows_to_read_for_schema_inference={}",
settings.schema_inference_hints,
settings.try_infer_integers,
settings.try_infer_dates,
settings.try_infer_datetimes,
settings.max_rows_to_read_for_schema_inference);
/// Second, format-specific settings:
switch (escaping_rule)
{
case FormatSettings::EscapingRule::Escaped:
case FormatSettings::EscapingRule::Raw:
result += fmt::format(
", use_best_effort_in_schema_inference={}, bool_true_representation={}, bool_false_representation={}, null_representation={}",
settings.tsv.use_best_effort_in_schema_inference,
settings.bool_true_representation,
settings.bool_false_representation,
settings.tsv.null_representation);
break;
case FormatSettings::EscapingRule::CSV:
result += fmt::format(
", use_best_effort_in_schema_inference={}, bool_true_representation={}, bool_false_representation={},"
" null_representation={}, delimiter={}, tuple_delimiter={}",
settings.tsv.use_best_effort_in_schema_inference,
settings.bool_true_representation,
settings.bool_false_representation,
settings.csv.null_representation,
settings.csv.delimiter,
settings.csv.tuple_delimiter);
break;
case FormatSettings::EscapingRule::JSON:
result += fmt::format(", try_infer_numbers_from_strings={}, read_bools_as_numbers={}", settings.json.try_infer_numbers_from_strings, settings.json.read_bools_as_numbers);
break;
default:
break;
}
return result;
}
}

View File

@ -77,4 +77,6 @@ void transformInferredTypesIfNeeded(DataTypePtr & first, DataTypePtr & second, c
void transformInferredJSONTypesIfNeeded(DataTypes & types, const FormatSettings & settings, const std::unordered_set<const IDataType *> * numbers_parsed_from_json_strings = nullptr);
void transformInferredJSONTypesIfNeeded(DataTypePtr & first, DataTypePtr & second, const FormatSettings & settings);
String getAdditionalFormatInfoByEscapingRule(const FormatSettings & settings,FormatSettings::EscapingRule escaping_rule);
}

View File

@ -63,10 +63,10 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.tuple_delimiter = settings.format_csv_delimiter;
format_settings.csv.empty_as_default = settings.input_format_csv_empty_as_default;
format_settings.csv.input_format_enum_as_number = settings.input_format_csv_enum_as_number;
format_settings.csv.enum_as_number = settings.input_format_csv_enum_as_number;
format_settings.csv.null_representation = settings.format_csv_null_representation;
format_settings.csv.input_format_arrays_as_nested_csv = settings.input_format_csv_arrays_as_nested_csv;
format_settings.csv.input_format_use_best_effort_in_schema_inference = settings.input_format_csv_use_best_effort_in_schema_inference;
format_settings.csv.arrays_as_nested_csv = settings.input_format_csv_arrays_as_nested_csv;
format_settings.csv.use_best_effort_in_schema_inference = settings.input_format_csv_use_best_effort_in_schema_inference;
format_settings.csv.skip_first_lines = settings.input_format_csv_skip_first_lines;
format_settings.hive_text.fields_delimiter = settings.input_format_hive_text_fields_delimiter;
format_settings.hive_text.collection_items_delimiter = settings.input_format_hive_text_collection_items_delimiter;
@ -124,9 +124,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.tsv.crlf_end_of_line = settings.output_format_tsv_crlf_end_of_line;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.tsv.enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.tsv.null_representation = settings.format_tsv_null_representation;
format_settings.tsv.input_format_use_best_effort_in_schema_inference = settings.input_format_tsv_use_best_effort_in_schema_inference;
format_settings.tsv.use_best_effort_in_schema_inference = settings.input_format_tsv_use_best_effort_in_schema_inference;
format_settings.tsv.skip_first_lines = settings.input_format_tsv_skip_first_lines;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;

View File

@ -111,11 +111,11 @@ struct FormatSettings
bool allow_double_quotes = true;
bool empty_as_default = false;
bool crlf_end_of_line = false;
bool input_format_enum_as_number = false;
bool input_format_arrays_as_nested_csv = false;
bool enum_as_number = false;
bool arrays_as_nested_csv = false;
String null_representation = "\\N";
char tuple_delimiter = ',';
bool input_format_use_best_effort_in_schema_inference = true;
bool use_best_effort_in_schema_inference = true;
UInt64 skip_first_lines = 0;
} csv;
@ -227,8 +227,8 @@ struct FormatSettings
bool empty_as_default = false;
bool crlf_end_of_line = false;
String null_representation = "\\N";
bool input_format_enum_as_number = false;
bool input_format_use_best_effort_in_schema_inference = true;
bool enum_as_number = false;
bool use_best_effort_in_schema_inference = true;
UInt64 skip_first_lines = 0;
} tsv;

View File

@ -239,26 +239,17 @@ NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header)
return result;
}
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
SchemaCache::Key getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
{
return getKeysForSchemaCache({source}, format, format_settings, context).front();
}
static String makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info)
static SchemaCache::Key makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info)
{
return source + "@@" + format + "@@" + additional_format_info;
return SchemaCache::Key{source, format, additional_format_info};
}
void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info)
{
size_t additional_format_info_pos = key.rfind("@@");
additional_format_info = key.substr(additional_format_info_pos + 2, key.size() - additional_format_info_pos - 2);
size_t format_pos = key.rfind("@@", additional_format_info_pos - 1);
format = key.substr(format_pos + 2, additional_format_info_pos - format_pos - 2);
source = key.substr(0, format_pos);
}
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
SchemaCache::Keys getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
{
/// For some formats data schema depends on some settings, so it's possible that
/// two queries to the same source will get two different schemas. To process this
@ -266,7 +257,7 @@ Strings getKeysForSchemaCache(const Strings & sources, const String & format, co
/// For example, for Protobuf format additional information is the path to the schema
/// and message name.
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings);
Strings cache_keys;
SchemaCache::Keys cache_keys;
cache_keys.reserve(sources.size());
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return makeSchemaCacheKey(source, format, additional_format_info); });
return cache_keys;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/ColumnsDescription.h>
#include <Storages/Cache/SchemaCache.h>
#include <Formats/FormatFactory.h>
namespace DB
@ -47,8 +48,8 @@ DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type);
/// in the block and return names and types.
NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header);
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
SchemaCache::Key getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
SchemaCache::Keys getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info);
}

View File

@ -89,15 +89,13 @@ void IIRowSchemaReader::setContext(ContextPtr & context)
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IIRowSchemaReader(in_, format_settings_)
: IIRowSchemaReader(in_, format_settings_), column_names(splitColumnNames(format_settings.column_names_for_schema_inference))
{
initColumnNames(format_settings.column_names_for_schema_inference);
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, DataTypePtr default_type_)
: IIRowSchemaReader(in_, format_settings_, default_type_)
: IIRowSchemaReader(in_, format_settings_, default_type_), column_names(splitColumnNames(format_settings.column_names_for_schema_inference))
{
initColumnNames(format_settings.column_names_for_schema_inference);
}
IRowSchemaReader::IRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_, const DataTypes & default_types_)
@ -171,11 +169,12 @@ NamesAndTypesList IRowSchemaReader::readSchema()
return result;
}
void IRowSchemaReader::initColumnNames(const String & column_names_str)
Strings splitColumnNames(const String & column_names_str)
{
if (column_names_str.empty())
return;
return {};
Strings column_names;
/// column_names_for_schema_inference is a string in format 'column1,column2,column3,...'
boost::split(column_names, column_names_str, boost::is_any_of(","));
for (auto & column_name : column_names)
@ -184,6 +183,7 @@ void IRowSchemaReader::initColumnNames(const String & column_names_str)
if (!col_name_trimmed.empty())
column_name = col_name_trimmed;
}
return column_names;
}
DataTypePtr IRowSchemaReader::getDefaultType(size_t column) const

View File

@ -136,4 +136,6 @@ void chooseResultColumnType(
void checkResultColumnTypeAndAppend(
NamesAndTypesList & result, DataTypePtr & type, const String & name, const DataTypePtr & default_type, size_t rows_read);
Strings splitColumnNames(const String & column_names_str);
}

View File

@ -401,6 +401,13 @@ void registerCSVSchemaReader(FormatFactory & factory)
{
return std::make_shared<CSVSchemaReader>(buf, with_names, with_types, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::CSV);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
};
registerWithNamesAndTypes("CSV", register_func);

View File

@ -319,7 +319,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto");
factory.registerAdditionalInfoForSchemaCacheGetter(
"CapnProto", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
"CapnProto", [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); });
}
void registerCapnProtoSchemaReader(FormatFactory & factory)

View File

@ -353,6 +353,19 @@ void registerCustomSeparatedSchemaReader(FormatFactory & factory)
{
return std::make_shared<CustomSeparatedSchemaReader>(buf, with_names, with_types, ignore_spaces, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(settings, settings.custom.escaping_rule);
return result + fmt::format(
", result_before_delimiter={}, row_before_delimiter={}, field_delimiter={},"
" row_after_delimiter={}, row_between_delimiter={}, result_after_delimiter={}",
settings.custom.result_before_delimiter,
settings.custom.row_before_delimiter,
settings.custom.field_delimiter,
settings.custom.row_after_delimiter,
settings.custom.row_between_delimiter,
settings.custom.result_after_delimiter);
});
};
registerWithNamesAndTypes(ignore_spaces ? "CustomSeparatedIgnoreSpaces" : "CustomSeparated", register_func);

View File

@ -1,6 +1,7 @@
#include <Processors/Formats/Impl/JSONColumnsBlockInputFormat.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
namespace DB
{
@ -66,6 +67,10 @@ void registerJSONColumnsSchemaReader(FormatFactory & factory)
return std::make_shared<JSONColumnsSchemaReaderBase>(buf, settings, std::make_unique<JSONColumnsReader>(buf));
}
);
factory.registerAdditionalInfoForSchemaCacheGetter("JSONColumns", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
});
}
}

View File

@ -1,4 +1,5 @@
#include <Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/JSONUtils.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/ReadHelpers.h>
@ -178,7 +179,10 @@ Chunk JSONColumnsBlockInputFormatBase::generate()
JSONColumnsSchemaReaderBase::JSONColumnsSchemaReaderBase(
ReadBuffer & in_, const FormatSettings & format_settings_, std::unique_ptr<JSONColumnsReaderBase> reader_)
: ISchemaReader(in_), format_settings(format_settings_), reader(std::move(reader_))
: ISchemaReader(in_)
, format_settings(format_settings_)
, reader(std::move(reader_))
, column_names_from_settings(splitColumnNames(format_settings_.column_names_for_schema_inference))
{
}
@ -214,8 +218,15 @@ NamesAndTypesList JSONColumnsSchemaReaderBase::readSchema()
do
{
auto column_name_opt = reader->readColumnStart();
/// If format doesn't have named for columns, use default names 'c1', 'c2', ...
String column_name = column_name_opt.has_value() ? *column_name_opt : "c" + std::to_string(iteration + 1);
/// If format doesn't have names for columns, use names from setting column_names_for_schema_inference or default names 'c1', 'c2', ...
String column_name;
if (column_name_opt.has_value())
column_name = *column_name_opt;
else if (iteration < column_names_from_settings.size())
column_name = column_names_from_settings[iteration];
else
column_name = "c" + std::to_string(iteration + 1);
/// Keep order of column names as it is in input data.
if (!names_to_types.contains(column_name))
names_order.push_back(column_name);

View File

@ -87,6 +87,7 @@ private:
const FormatSettings format_settings;
std::unique_ptr<JSONColumnsReaderBase> reader;
Names column_names_from_settings;
};
}

View File

@ -1,6 +1,7 @@
#include <Processors/Formats/Impl/JSONCompactColumnsBlockInputFormat.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
namespace DB
{
@ -60,6 +61,11 @@ void registerJSONCompactColumnsSchemaReader(FormatFactory & factory)
return std::make_shared<JSONColumnsSchemaReaderBase>(buf, settings, std::make_unique<JSONCompactColumnsReader>(buf));
}
);
factory.registerAdditionalInfoForSchemaCacheGetter("JSONCompactColumns", [](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return result + fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
});
}
}

View File

@ -245,6 +245,11 @@ void registerJSONCompactEachRowSchemaReader(FormatFactory & factory)
{
return std::make_shared<JSONCompactEachRowRowSchemaReader>(buf, with_names, with_types, json_strings, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return result + fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
});
};
registerWithNamesAndTypes(json_strings ? "JSONCompactStringsEachRow" : "JSONCompactEachRow", register_func);
}

View File

@ -355,44 +355,26 @@ void JSONEachRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTyp
void registerInputFormatJSONEachRow(FormatFactory & factory)
{
factory.registerInputFormat("JSONEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
auto register_format = [&](const String & format_name, bool json_strings)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
factory.registerInputFormat(format_name, [json_strings](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, json_strings);
});
};
factory.registerInputFormat("JSONLines", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
factory.registerInputFormat("NDJSON", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
register_format("JSONEachRow", false);
register_format("JSONLines", false);
register_format("NDJSON", false);
factory.registerFileExtension("ndjson", "JSONEachRow");
factory.registerFileExtension("jsonl", "JSONEachRow");
factory.registerInputFormat("JSONStringsEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
});
register_format("JSONStringsEachRow", true);
factory.markFormatSupportsSubsetOfColumns("JSONEachRow");
factory.markFormatSupportsSubsetOfColumns("JSONLines");
@ -418,25 +400,22 @@ void registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(FormatFactory & factory
void registerJSONEachRowSchemaReader(FormatFactory & factory)
{
factory.registerSchemaReader("JSONEachRow", [](ReadBuffer & buf, const FormatSettings & settings)
auto register_schema_reader = [&](const String & format_name, bool json_strings)
{
return std::make_unique<JSONEachRowSchemaReader>(buf, false, settings);
});
factory.registerSchemaReader(format_name, [json_strings](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_unique<JSONEachRowSchemaReader>(buf, json_strings, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
});
};
factory.registerSchemaReader("JSONStringsEachRow", [](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_unique<JSONEachRowSchemaReader>(buf, true, settings);
});
factory.registerSchemaReader("JSONLines", [](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_unique<JSONEachRowSchemaReader>(buf, false, settings);
});
factory.registerSchemaReader("NDJSON", [](ReadBuffer & buf, const FormatSettings & settings)
{
return std::make_unique<JSONEachRowSchemaReader>(buf, false, settings);
});
register_schema_reader("JSONEachRow", false);
register_schema_reader("JSONLines", false);
register_schema_reader("NDJSON", false);
register_schema_reader("JSONStringsEachRow", true);
}
}

View File

@ -539,6 +539,14 @@ void registerMsgPackSchemaReader(FormatFactory & factory)
{
return std::make_shared<MsgPackSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("MsgPack", [](const FormatSettings & settings)
{
return fmt::format(
"number_of_columns={}, schema_inference_hints={}, max_rows_to_read_for_schema_inference={}",
settings.msgpack.number_of_columns,
settings.schema_inference_hints,
settings.max_rows_to_read_for_schema_inference);
});
}
}

View File

@ -452,9 +452,6 @@ void registerInputFormatMySQLDump(FormatFactory & factory)
{
return std::make_shared<MySQLDumpRowInputFormat>(buf, header, params, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(
"MySQLDump", [](const FormatSettings & settings) { return "Table name: " + settings.mysql_dump.table_name; });
}
void registerMySQLSchemaReader(FormatFactory & factory)
@ -463,6 +460,12 @@ void registerMySQLSchemaReader(FormatFactory & factory)
{
return std::make_shared<MySQLDumpSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("MySQLDump", [](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::Quoted);
return result + fmt::format(", table_name={}", settings.mysql_dump.table_name);
});
}

View File

@ -82,7 +82,7 @@ void registerInputFormatProtobufList(FormatFactory & factory)
});
factory.markFormatSupportsSubsetOfColumns("ProtobufList");
factory.registerAdditionalInfoForSchemaCacheGetter(
"ProtobufList", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
"ProtobufList", [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); });
}
void registerProtobufListSchemaReader(FormatFactory & factory)

View File

@ -104,7 +104,7 @@ void registerProtobufSchemaReader(FormatFactory & factory)
for (const auto & name : {"Protobuf", "ProtobufSingle"})
factory.registerAdditionalInfoForSchemaCacheGetter(
name, [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
name, [](const FormatSettings & settings) { return fmt::format("format_schema={}", settings.schema.format_schema); });
}
}

View File

@ -211,6 +211,11 @@ void registerRegexpSchemaReader(FormatFactory & factory)
{
return std::make_shared<RegexpSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("Regexp", [](const FormatSettings & settings)
{
auto result = getAdditionalFormatInfoByEscapingRule(settings, settings.regexp.escaping_rule);
return result + fmt::format(", regexp={}", settings.regexp.regexp);
});
}
}

View File

@ -285,6 +285,10 @@ void registerTSKVSchemaReader(FormatFactory & factory)
{
return std::make_shared<TSKVSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("TSKV", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::Escaped);
});
}
}

View File

@ -302,6 +302,14 @@ void registerTSVSchemaReader(FormatFactory & factory)
{
return std::make_shared<TabSeparatedSchemaReader>(buf, with_names, with_types, is_raw, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [with_names, is_raw](const FormatSettings & settings)
{
String result = getAdditionalFormatInfoByEscapingRule(
settings, is_raw ? FormatSettings::EscapingRule::Raw : FormatSettings::EscapingRule::Escaped);
if (!with_names)
result += fmt::format(", column_names_for_schema_inference={}", settings.column_names_for_schema_inference);
return result;
});
};
registerWithNamesAndTypes(is_raw ? "TabSeparatedRaw" : "TabSeparated", register_func);

View File

@ -566,13 +566,32 @@ void registerTemplateSchemaReader(FormatFactory & factory)
{
for (bool ignore_spaces : {false, true})
{
factory.registerSchemaReader(ignore_spaces ? "TemplateIgnoreSpaces" : "Template", [ignore_spaces](ReadBuffer & buf, const FormatSettings & settings)
String format_name = ignore_spaces ? "TemplateIgnoreSpaces" : "Template";
factory.registerSchemaReader(format_name, [ignore_spaces](ReadBuffer & buf, const FormatSettings & settings)
{
size_t index = 0;
auto idx_getter = [&](const String &) -> std::optional<size_t> { return index++; };
auto row_format = fillRowFormat(settings, idx_getter, false);
return std::make_shared<TemplateSchemaReader>(buf, ignore_spaces, fillResultSetFormat(settings), row_format, settings.template_settings.row_between_delimiter, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter(format_name, [](const FormatSettings & settings)
{
size_t index = 0;
auto idx_getter = [&](const String &) -> std::optional<size_t> { return index++; };
auto row_format = fillRowFormat(settings, idx_getter, false);
std::unordered_set<FormatSettings::EscapingRule> visited_escaping_rules;
String result = fmt::format("row_format={}, resultset_format={}, row_between_delimiter={}",
settings.template_settings.row_format,
settings.template_settings.resultset_format,
settings.template_settings.row_between_delimiter);
for (auto escaping_rule : row_format.escaping_rules)
{
if (!visited_escaping_rules.contains(escaping_rule))
result += ", " + getAdditionalFormatInfoByEscapingRule(settings, settings.regexp.escaping_rule);
visited_escaping_rules.insert(escaping_rule);
}
return result;
});
}
}

View File

@ -634,6 +634,10 @@ void registerValuesSchemaReader(FormatFactory & factory)
{
return std::make_shared<ValuesSchemaReader>(buf, settings);
});
factory.registerAdditionalInfoForSchemaCacheGetter("Values", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::Quoted);
});
}
}

View File

@ -17,21 +17,21 @@ SchemaCache::SchemaCache(size_t max_elements_) : max_elements(max_elements_)
{
}
void SchemaCache::add(const String & key, const ColumnsDescription & columns)
void SchemaCache::add(const Key & key, const ColumnsDescription & columns)
{
std::lock_guard lock(mutex);
addUnlocked(key, columns);
}
void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns)
void SchemaCache::addMany(const Keys & keys, const ColumnsDescription & columns)
{
std::lock_guard lock(mutex);
for (const auto & key : keys)
addUnlocked(key, columns);
}
void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns)
void SchemaCache::addUnlocked(const Key & key, const ColumnsDescription & columns)
{
/// Do nothing if this key is already in cache;
if (data.contains(key))
@ -54,7 +54,7 @@ void SchemaCache::checkOverflow()
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheEvictions);
}
std::optional<ColumnsDescription> SchemaCache::tryGet(const String & key, LastModificationTimeGetter get_last_mod_time)
std::optional<ColumnsDescription> SchemaCache::tryGet(const Key & key, LastModificationTimeGetter get_last_mod_time)
{
std::lock_guard lock(mutex);
auto it = data.find(key);
@ -101,10 +101,10 @@ void SchemaCache::clear()
queue.clear();
}
std::unordered_map<String, SchemaCache::SchemaInfo> SchemaCache::getAll()
std::unordered_map<SchemaCache::Key, SchemaCache::SchemaInfo, SchemaCache::KeyHash> SchemaCache::getAll()
{
std::lock_guard lock(mutex);
std::unordered_map<String, SchemaCache::SchemaInfo> result;
std::unordered_map<Key, SchemaCache::SchemaInfo, SchemaCache::KeyHash> result;
for (const auto & [key, value] : data)
result[key] = value.schema_info;

View File

@ -23,6 +23,28 @@ class SchemaCache
public:
SchemaCache(size_t max_elements_);
struct Key
{
String source;
String format;
String additional_format_info;
bool operator==(const Key & other) const
{
return source == other.source && format == other.format && additional_format_info == other.additional_format_info;
}
};
using Keys = std::vector<Key>;
struct KeyHash
{
size_t operator()(const Key & key) const
{
return std::hash<String>()(key.source + key.format + key.additional_format_info);
}
};
struct SchemaInfo
{
ColumnsDescription columns;
@ -32,22 +54,22 @@ public:
using LastModificationTimeGetter = std::function<std::optional<time_t>()>;
/// Add new key with a schema
void add(const String & key, const ColumnsDescription & columns);
void add(const Key & key, const ColumnsDescription & columns);
/// Add many keys with the same schema (usually used for globs)
void addMany(const Strings & keys, const ColumnsDescription & columns);
void addMany(const Keys & keys, const ColumnsDescription & columns);
std::optional<ColumnsDescription> tryGet(const String & key, LastModificationTimeGetter get_last_mod_time = {});
std::optional<ColumnsDescription> tryGet(const Key & key, LastModificationTimeGetter get_last_mod_time = {});
void clear();
std::unordered_map<String, SchemaInfo> getAll();
std::unordered_map<Key, SchemaInfo, SchemaCache::KeyHash> getAll();
private:
void addUnlocked(const String & key, const ColumnsDescription & columns);
void addUnlocked(const Key & key, const ColumnsDescription & columns);
void checkOverflow();
using Queue = std::list<String>;
using Queue = std::list<Key>;
using QueueIterator = Queue::iterator;
struct Cell
@ -57,7 +79,7 @@ private:
};
Queue queue;
std::unordered_map<String, Cell> data;
std::unordered_map<Key, Cell, KeyHash> data;
size_t max_elements;
std::mutex mutex;

View File

@ -774,7 +774,7 @@ std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(
};
String url = fs::path(uri_without_path) / path;
String cache_key = getKeyForSchemaCache(url, format_name, {}, ctx);
auto cache_key = getKeyForSchemaCache(url, format_name, {}, ctx);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
@ -794,7 +794,7 @@ void StorageHDFS::addColumnsToCache(
Strings sources;
sources.reserve(paths.size());
std::transform(paths.begin(), paths.end(), std::back_inserter(sources), [&](const String & path){ return fs::path(uri_without_path) / path; });
Strings cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx);
auto cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx);
schema_cache.addMany(cache_keys, columns);
}

View File

@ -1257,7 +1257,7 @@ std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
return file_stat.st_mtime;
};
String cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
auto cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
@ -1274,7 +1274,7 @@ void StorageFile::addColumnsToCache(
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
auto cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns);
}

View File

@ -1389,7 +1389,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
};
String source = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path;
String cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx);
auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
@ -1410,7 +1410,7 @@ void StorageS3::addColumnsToCache(
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; });
Strings cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addMany(cache_keys, columns);
}

View File

@ -836,7 +836,7 @@ std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
return last_mod_time;
};
String cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
auto cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
@ -853,7 +853,7 @@ void IStorageURLBase::addColumnsToCache(
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
Strings cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
auto cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns);
}

View File

@ -44,16 +44,13 @@ NamesAndTypesList StorageSystemSchemaInferenceCache::getNamesAndTypes()
static void fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name)
{
auto s3_schema_cache_data = schema_cache.getAll();
String source;
String format;
String additional_format_info;
for (const auto & [key, schema_info] : s3_schema_cache_data)
{
splitSchemaCacheKey(key, source, format, additional_format_info);
res_columns[0]->insert(storage_name);
res_columns[1]->insert(source);
res_columns[2]->insert(format);
res_columns[3]->insert(additional_format_info);
res_columns[1]->insert(key.source);
res_columns[2]->insert(key.format);
res_columns[3]->insert(key.additional_format_info);
res_columns[4]->insert(schema_info.registration_time);
res_columns[5]->insert(getSchemaString(schema_info.columns));
}

View File

@ -1,2 +1,2 @@
insert into function file(data_02318.tsv) select * from numbers(10);
desc file('data_02318.tsv', 'Template') SETTINGS format_template_row='nonexist', format_template_resultset='nonexist'; -- {serverError CANNOT_EXTRACT_TABLE_STRUCTURE}
desc file('data_02318.tsv', 'Template') SETTINGS format_template_row='nonexist', format_template_resultset='nonexist'; -- {serverError FILE_DOESNT_EXIST}

View File

@ -0,0 +1,90 @@
TSV
c1 Nullable(Int64)
c2 Nullable(Date)
c1 Nullable(Float64)
c2 Nullable(Date)
c1 Nullable(Int64)
c2 Nullable(DateTime64(9))
c1 UInt8
c2 Nullable(Date)
4
TSVWithNames
number Nullable(Int64)
toDate(number) Nullable(Date)
number Nullable(Float64)
toDate(number) Nullable(Date)
number Nullable(Int64)
toDate(number) Nullable(DateTime64(9))
number Nullable(Int64)
toDate(number) Nullable(Date)
4
CSV
c1 Nullable(Int64)
c2 Nullable(Date)
c1 Nullable(Float64)
c2 Nullable(Date)
c1 Nullable(Int64)
c2 Nullable(DateTime64(9))
c1 UInt8
c2 Nullable(Date)
4
CSVWithNames
number Nullable(Int64)
toDate(number) Nullable(Date)
number Nullable(Float64)
toDate(number) Nullable(Date)
number Nullable(Int64)
toDate(number) Nullable(DateTime64(9))
number Nullable(Int64)
toDate(number) Nullable(Date)
4
TSKV
number Nullable(Int64)
toDate(number) Nullable(Date)
number Nullable(Float64)
toDate(number) Nullable(Date)
number Nullable(Int64)
toDate(number) Nullable(DateTime64(9))
number Nullable(Int64)
toDate(number) Nullable(Date)
4
CustomSeparated
c1 Nullable(Int64)
c2 Nullable(Date)
c1 Nullable(Float64)
c2 Nullable(Date)
c1 Nullable(Int64)
c2 Nullable(DateTime64(9))
c1 UInt8
c2 Nullable(Date)
4
JSONEachRow
number Nullable(Int64)
toDate(number) Nullable(Date)
number Nullable(Float64)
toDate(number) Nullable(Date)
number Nullable(Int64)
toDate(number) Nullable(DateTime64(9))
number Nullable(Int64)
toDate(number) Nullable(Date)
4
JSONCompactEachRow
c1 Nullable(Int64)
c2 Nullable(Date)
c1 Nullable(Float64)
c2 Nullable(Date)
c1 Nullable(Int64)
c2 Nullable(DateTime64(9))
c1 UInt8
c2 Nullable(Date)
4
Values
c1 Nullable(Int64)
c2 Nullable(Date)
c1 Nullable(Float64)
c2 Nullable(Date)
c1 Nullable(Int64)
c2 Nullable(DateTime64(9))
c1 UInt8
c2 Nullable(Date)
4

View File

@ -0,0 +1,16 @@
-- Tags: no-parallel, no-fasttest
system drop schema cache for file;
{% for format in ['TSV', 'TSVWithNames', 'CSV', 'CSVWithNames', 'TSKV', 'CustomSeparated', 'JSONEachRow', 'JSONCompactEachRow', 'Values'] -%}
select '{{ format }}';
insert into function file(02404_data.{{ format }}) select number, toDate(number) from numbers(10);
desc file(02404_data.{{ format }});
desc file(02404_data.{{ format }}) settings input_format_try_infer_integers=0;
desc file(02404_data.{{ format }}) settings input_format_try_infer_dates=0;
desc file(02404_data.{{ format }}) settings schema_inference_hints='c1 UInt8';
select count() from system.schema_inference_cache where countSubstrings(source, '02404_data.{{ format }}') > 0;
{% endfor -%}