Allow to skip unknown columns in Native format

This commit is contained in:
avogar 2022-05-13 14:27:15 +00:00
parent b17fec659a
commit cef13c2c02
5 changed files with 31 additions and 16 deletions

View File

@ -23,6 +23,7 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX; extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int INCORRECT_DATA;
} }
@ -31,8 +32,8 @@ NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_)
{ {
} }
NativeReader::NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_) NativeReader::NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool skip_unknown_columns_)
: istr(istr_), header(header_), server_revision(server_revision_) : istr(istr_), header(header_), server_revision(server_revision_), skip_unknown_columns(skip_unknown_columns_)
{ {
} }
@ -186,18 +187,29 @@ Block NativeReader::read()
column.column = std::move(read_column); column.column = std::move(read_column);
bool use_in_result = true;
if (header) if (header)
{ {
/// Support insert from old clients without low cardinality type. if (header.has(column.name))
auto & header_column = header.getByName(column.name);
if (!header_column.type->equals(*column.type))
{ {
column.column = recursiveTypeConversion(column.column, column.type, header.safeGetByPosition(i).type); /// Support insert from old clients without low cardinality type.
column.type = header.safeGetByPosition(i).type; auto & header_column = header.getByName(column.name);
if (!header_column.type->equals(*column.type))
{
column.column = recursiveTypeConversion(column.column, column.type, header.safeGetByPosition(i).type);
column.type = header.safeGetByPosition(i).type;
}
}
else
{
if (!skip_unknown_columns)
throw Exception(ErrorCodes::INCORRECT_DATA, "Unknown column with name {} found while reading data in Native format", column.name);
use_in_result = false;
} }
} }
res.insert(std::move(column)); if (use_in_result)
res.insert(std::move(column));
if (use_index) if (use_index)
++index_column_it; ++index_column_it;

View File

@ -24,7 +24,7 @@ public:
/// For cases when data structure (header) is known in advance. /// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented. /// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_); NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool skip_unknown_columns_ = false);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read. /// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeReader(ReadBuffer & istr_, UInt64 server_revision_, NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
@ -43,6 +43,7 @@ private:
ReadBuffer & istr; ReadBuffer & istr;
Block header; Block header;
UInt64 server_revision; UInt64 server_revision;
bool skip_unknown_columns;
bool use_index = false; bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it; IndexForNativeFormat::Blocks::const_iterator index_block_it;

View File

@ -703,9 +703,9 @@ void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV &
if constexpr (!std::is_same_v<Vector, NullOutput>) if constexpr (!std::is_same_v<Vector, NullOutput>)
{ {
/** CSV format can contain insignificant spaces and tabs. /** CSV format can contain insignificant spaces and tabs.
* Usually the task of skipping them is for the calling code. * Usually the task of skipping them is for the calling code.
* But in this case, it will be difficult to do this, so remove the trailing whitespace by ourself. * But in this case, it will be difficult to do this, so remove the trailing whitespace by ourself.
*/ */
size_t size = s.size(); size_t size = s.size();
while (size > 0 && (s[size - 1] == ' ' || s[size - 1] == '\t')) while (size > 0 && (s[size - 1] == ' ' || s[size - 1] == '\t'))
--size; --size;

View File

@ -15,9 +15,9 @@ namespace DB
class NativeInputFormat final : public IInputFormat class NativeInputFormat final : public IInputFormat
{ {
public: public:
NativeInputFormat(ReadBuffer & buf, const Block & header_) NativeInputFormat(ReadBuffer & buf, const Block & header_, const FormatSettings & settings)
: IInputFormat(header_, buf) : IInputFormat(header_, buf)
, reader(std::make_unique<NativeReader>(buf, header_, 0)) , reader(std::make_unique<NativeReader>(buf, header_, 0, settings.skip_unknown_fields))
, header(header_) {} , header(header_) {}
String getName() const override { return "Native"; } String getName() const override { return "Native"; }
@ -112,10 +112,11 @@ void registerInputFormatNative(FormatFactory & factory)
ReadBuffer & buf, ReadBuffer & buf,
const Block & sample, const Block & sample,
const RowInputFormatParams &, const RowInputFormatParams &,
const FormatSettings &) const FormatSettings & settings)
{ {
return std::make_shared<NativeInputFormat>(buf, sample); return std::make_shared<NativeInputFormat>(buf, sample, settings);
}); });
factory.markFormatSupportsSamplingColumns("Native");
} }
void registerOutputFormatNative(FormatFactory & factory) void registerOutputFormatNative(FormatFactory & factory)

View File

@ -19,6 +19,7 @@
<value>ORC</value> <value>ORC</value>
<value>Parquet</value> <value>Parquet</value>
<value>Arrow</value> <value>Arrow</value>
<value>Native</value>
</values> </values>
</substitution> </substitution>
</substitutions> </substitutions>