Add input_format_skip_unknown_fields new config parameter. [#METR-22801]

This commit is contained in:
Vitaliy Lyudvichenko 2016-09-20 22:11:25 +03:00
parent ec4b7c967d
commit c4666af3ab
6 changed files with 76 additions and 9 deletions

View File

@ -19,13 +19,14 @@ class ReadBuffer;
class JSONEachRowRowInputStream : public IRowInputStream
{
public:
JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_);
JSONEachRowRowInputStream(ReadBuffer& istr_, const Block& sample_, bool skip_unknown_);
bool read(Block & block) override;
private:
ReadBuffer & istr;
const Block sample;
bool skip_unknown;
/// Буфер для прочитанного из потока имени поля. Используется, если его потребовалось скопировать.
String name_buf;

View File

@ -810,6 +810,9 @@ inline void skipWhitespaceIfAny(ReadBuffer & buf)
++buf.position();
}
/// Skip json value (except array and object).
void skipJSONFieldPlain(ReadBuffer & buf, const String & name_of_filed = "");
/** Прочитать сериализованный эксепшен.
* При сериализации/десериализации часть информации теряется

View File

@ -223,6 +223,9 @@ struct Settings
\
/** Write add http CORS header */ \
M(SettingBool, add_http_cors_header, false) \
\
/** Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats). */ \
M(SettingBool, input_format_skip_unknown_fields, false)
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -61,9 +61,15 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
else if (name == "CSVWithNames")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<CSVRowInputStream>(buf, sample, ',', true), sample, max_block_size);
else if (name == "TSKV")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<TSKVRowInputStream>(buf, sample, false), sample, max_block_size);
{
auto row_stream = std::make_shared<TSKVRowInputStream>(buf, sample, context.getSettingsRef().input_format_skip_unknown_fields);
return std::make_shared<BlockInputStreamFromRowInputStream>(std::move(row_stream), sample, max_block_size);
}
else if (name == "JSONEachRow")
return std::make_shared<BlockInputStreamFromRowInputStream>(std::make_shared<JSONEachRowRowInputStream>(buf, sample), sample, max_block_size);
{
auto row_stream = std::make_shared<JSONEachRowRowInputStream>(buf, sample, context.getSettingsRef().input_format_skip_unknown_fields);
return std::make_shared<BlockInputStreamFromRowInputStream>(std::move(row_stream), sample, max_block_size);
}
else if (name == "TabSeparatedRaw"
|| name == "BlockTabSeparated"
|| name == "Pretty"

View File

@ -12,8 +12,8 @@ namespace ErrorCodes
}
JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_)
: istr(istr_), sample(sample_), name_map(sample.columns())
JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_, bool skip_unknown_)
: istr(istr_), sample(sample_), skip_unknown(skip_unknown_), name_map(sample.columns())
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
@ -50,6 +50,14 @@ static StringRef readName(ReadBuffer & buf, String & tmp)
}
static void skipColonDelimeter(ReadBuffer & istr)
{
skipWhitespaceIfAny(istr);
assertChar(':', istr);
skipWhitespaceIfAny(istr);
}
bool JSONEachRowRowInputStream::read(Block & block)
{
skipWhitespaceIfAny(istr);
@ -93,16 +101,21 @@ bool JSONEachRowRowInputStream::read(Block & block)
auto it = name_map.find(name_ref);
if (name_map.end() == it)
{
if (!skip_unknown)
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
skipColonDelimeter(istr);
skipJSONFieldPlain(istr, name_ref.toString());
continue;
}
size_t index = it->second;
if (read_columns[index])
throw Exception("Duplicate field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
skipWhitespaceIfAny(istr);
assertChar(':', istr);
skipWhitespaceIfAny(istr);
skipColonDelimeter(istr);
read_columns[index] = true;

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
extern const int CANNOT_PARSE_QUOTED_STRING;
extern const int INCORRECT_DATA;
}
@ -380,6 +381,7 @@ void readQuotedString(String & s, ReadBuffer & buf)
}
template void readQuotedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readDoubleQuotedStringInto(NullSink & s, ReadBuffer & buf);
void readDoubleQuotedString(String & s, ReadBuffer & buf)
{
@ -522,6 +524,7 @@ void readJSONString(String & s, ReadBuffer & buf)
}
template void readJSONStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readJSONStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf)
@ -569,6 +572,44 @@ void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf)
}
void skipJSONFieldPlain(ReadBuffer & buf, const String & name_of_filed)
{
if (buf.eof())
throw Exception("Unexpected EOF for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA);
else if (*buf.position() == '"') /// skip double-quoted string
{
NullSink sink;
readJSONStringInto(sink, buf);
}
else if (isdigit(*buf.position())) /// skip number
{
double v;
if (!tryReadFloatText(v, buf))
throw Exception("Expected a number field for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA);
}
else if (*buf.position() == 'n') /// skip null
{
assertString("null", buf);
}
else if (*buf.position() == 't') /// skip true
{
assertString("true", buf);
}
else if (*buf.position() == 'f')/// skip false
{
assertString("false", buf);
}
else if (*buf.position() == '{' || *buf.position() == '[') /// fail on nested objects
{
throw Exception("Unexpected nested field for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA);
}
else
{
throw Exception("Unexpected symbol for key '" + name_of_filed + "'", ErrorCodes::INCORRECT_DATA);
}
}
void readException(Exception & e, ReadBuffer & buf, const String & additional_message)
{
int code = 0;