ClickHouse/src/Formats/ReadSchemaUtils.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

222 lines
11 KiB
C++
Raw Normal View History

#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Formats/ReadSchemaUtils.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Common/assert_cast.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
2023-03-31 13:56:35 +00:00
namespace DB {
namespace ErrorCodes {
extern const int EMPTY_DATA_PASSED;
extern const int BAD_ARGUMENTS;
extern const int ONLY_NULLS_WHILE_READING_SCHEMA;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
2023-03-31 13:56:35 +00:00
static std::optional <NamesAndTypesList> getOrderedColumnsList(
const NamesAndTypesList &columns_list, const Names &columns_order_hint) {
if (columns_list.size() != columns_order_hint.size())
return {};
2023-03-31 13:56:35 +00:00
std::unordered_map <String, DataTypePtr> available_columns;
for (const auto &[name, type]: columns_list)
available_columns.emplace(name, type);
2023-03-31 13:56:35 +00:00
NamesAndTypesList res;
for (const auto &name: columns_order_hint) {
auto it = available_columns.find(name);
if (it == available_columns.end())
return {};
2023-03-31 13:56:35 +00:00
res.emplace_back(name, it->second);
2022-04-19 19:16:47 +00:00
}
2023-03-31 13:56:35 +00:00
return res;
}
2023-03-31 13:56:35 +00:00
bool isRetryableSchemaInferenceError(int code) {
return code == ErrorCodes::EMPTY_DATA_PASSED || code == ErrorCodes::ONLY_NULLS_WHILE_READING_SCHEMA;
}
ColumnsDescription readSchemaFromFormat(
const String &format_name,
const std::optional <FormatSettings> &format_settings,
ReadBufferIterator &read_buffer_iterator,
bool retry,
ContextPtr &context,
std::unique_ptr <ReadBuffer> &buf) {
NamesAndTypesList names_and_types;
if (FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format_name)) {
auto external_schema_reader = FormatFactory::instance().getExternalSchemaReader(format_name, context,
format_settings);
try {
names_and_types = external_schema_reader->readSchema();
}
2023-03-31 13:56:35 +00:00
catch (Exception &e) {
2022-09-17 21:03:49 +00:00
e.addMessage(fmt::format(
2023-03-31 13:56:35 +00:00
"Cannot extract table structure from {} format file. You can specify the structure manually",
format_name));
2022-09-17 21:03:49 +00:00
throw;
}
2023-03-31 13:56:35 +00:00
} else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name)) {
std::string exception_messages;
SchemaReaderPtr schema_reader;
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference
: context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
size_t iterations = 0;
ColumnsDescription cached_columns;
while (true) {
bool is_eof = false;
try {
buf = read_buffer_iterator(cached_columns);
if (!buf)
break;
is_eof = buf->eof();
}
catch (Exception &e) {
e.addMessage(fmt::format(
"Cannot extract table structure from {} format file. You can specify the structure manually",
format_name));
throw;
}
catch (...) {
auto exception_message = getCurrentExceptionMessage(false);
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file:\n{}\nYou can specify the structure manually",
format_name,
exception_message);
}
2022-07-13 15:57:55 +00:00
2023-03-31 13:56:35 +00:00
++iterations;
2023-03-31 13:56:35 +00:00
if (is_eof) {
auto exception_message = fmt::format(
"Cannot extract table structure from {} format file, file is empty", format_name);
2023-03-31 13:56:35 +00:00
if (!retry)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"{}. You can specify the structure manually", exception_message);
2023-03-31 13:56:35 +00:00
exception_messages += "\n" + exception_message;
continue;
}
2023-03-31 13:56:35 +00:00
try {
schema_reader = FormatFactory::instance().getSchemaReader(format_name, *buf, context,
format_settings);
schema_reader->setMaxRowsToRead(max_rows_to_read);
names_and_types = schema_reader->readSchema();
break;
}
catch (...) {
auto exception_message = getCurrentExceptionMessage(false);
if (schema_reader) {
size_t rows_read = schema_reader->getNumRowsRead();
assert(rows_read <= max_rows_to_read);
max_rows_to_read -= schema_reader->getNumRowsRead();
if (rows_read != 0 && max_rows_to_read == 0) {
exception_message += "\nTo increase the maximum number of rows to read for structure determination, use setting input_format_max_rows_to_read_for_schema_inference";
if (iterations > 1) {
exception_messages += "\n" + exception_message;
break;
}
retry = false;
}
}
2023-03-31 13:56:35 +00:00
if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode())) {
try {
throw;
}
catch (Exception &e) {
e.addMessage(fmt::format(
"Cannot extract table structure from {} format file. You can specify the structure manually",
format_name));
throw;
}
catch (...) {
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file. "
"Error: {}. You can specify the structure manually",
format_name, exception_message);
}
2022-09-17 21:03:49 +00:00
}
2023-03-31 13:56:35 +00:00
exception_messages += "\n" + exception_message;
}
}
2023-03-31 13:56:35 +00:00
if (!cached_columns.empty())
return cached_columns;
if (names_and_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"All attempts to extract table structure from files failed. "
"Errors:{}\nYou can specify the structure manually", exception_messages);
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats
/// without strict column order (like JSON and TSKV).
/// It will allow to execute simple data loading with query
/// "INSERT INTO table SELECT * FROM ..."
const auto &insertion_table = context->getInsertionTable();
if (!schema_reader->hasStrictOrderOfColumns() && !insertion_table.empty()) {
auto storage = DatabaseCatalog::instance().getTable(insertion_table, context);
auto metadata = storage->getInMemoryMetadataPtr();
auto names_in_storage = metadata->getColumns().getNamesOfPhysical();
auto ordered_list = getOrderedColumnsList(names_and_types, names_in_storage);
if (ordered_list)
names_and_types = *ordered_list;
}
} else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{} file format doesn't support schema inference. You must specify the structure manually",
format_name);
names_and_types.erase(std::remove_if(names_and_types.begin(), names_and_types.end(),
[](const NameAndTypePair &pair) { return pair.name.empty(); }),
names_and_types.end());
return ColumnsDescription(names_and_types);
}
2023-03-31 13:56:35 +00:00
ColumnsDescription
readSchemaFromFormat(const String &format_name, const std::optional <FormatSettings> &format_settings,
ReadBufferIterator &read_buffer_iterator, bool retry, ContextPtr &context) {
std::unique_ptr <ReadBuffer> buf_out;
return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, retry, context, buf_out);
}
2023-03-31 13:56:35 +00:00
SchemaCache::Key getKeyForSchemaCache(const String &source, const String &format,
const std::optional <FormatSettings> &format_settings,
const ContextPtr &context) {
return getKeysForSchemaCache({source}, format, format_settings, context).front();
}
2023-03-31 13:56:35 +00:00
static SchemaCache::Key
makeSchemaCacheKey(const String &source, const String &format, const String &additional_format_info) {
return SchemaCache::Key{source, format, additional_format_info};
}
2023-03-31 13:56:35 +00:00
SchemaCache::Keys getKeysForSchemaCache(const Strings &sources, const String &format,
const std::optional <FormatSettings> &format_settings,
const ContextPtr &context) {
/// For some formats data schema depends on some settings, so it's possible that
/// two queries to the same source will get two different schemas. To process this
/// case we add some additional information specific for the format to the cache key.
/// For example, for Protobuf format additional information is the path to the schema
/// and message name.
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context,
format_settings);
SchemaCache::Keys cache_keys;
cache_keys.reserve(sources.size());
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys),
[&](const auto &source) { return makeSchemaCacheKey(source, format, additional_format_info); });
return cache_keys;
}
}