2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/ReadHelpers.h>
|
2018-06-10 19:22:49 +00:00
|
|
|
|
|
|
|
#include <Formats/JSONEachRowRowInputStream.h>
|
|
|
|
#include <Formats/FormatFactory.h>
|
|
|
|
#include <Formats/BlockInputStreamFromRowInputStream.h>
|
2016-02-18 11:44:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int INCORRECT_DATA;
|
|
|
|
extern const int CANNOT_READ_ALL_DATA;
|
2016-02-18 11:44:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-06-08 01:51:55 +00:00
|
|
|
JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings)
|
|
|
|
: istr(istr_), header(header_), format_settings(format_settings), name_map(header.columns())
|
2016-02-18 11:44:50 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
|
|
|
|
skipBOMIfExists(istr);
|
2016-06-23 19:39:20 +00:00
|
|
|
|
2017-12-14 20:58:18 +00:00
|
|
|
size_t num_columns = header.columns();
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
2018-09-14 07:28:49 +00:00
|
|
|
name_map[column_name(i)] = i; /// NOTE You could place names more cache-locally.
|
|
|
|
}
|
|
|
|
|
|
|
|
const String& JSONEachRowRowInputStream::column_name(size_t i) const
|
|
|
|
{
|
|
|
|
return header.safeGetByPosition(i).name;
|
2016-02-18 11:44:50 +00:00
|
|
|
}
|
|
|
|
|
2018-09-14 08:27:49 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
|
|
|
enum
|
|
|
|
{
|
|
|
|
UNKNOWN_FIELD = size_t(-1)
|
|
|
|
};
|
|
|
|
|
|
|
|
} // unnamed namespace
|
|
|
|
|
|
|
|
size_t JSONEachRowRowInputStream::get_column_index(const StringRef& name) const
|
|
|
|
{
|
|
|
|
/// NOTE Optimization is possible by caching the order of fields (which is almost always the same)
|
|
|
|
/// and a quick check to match the next expected field, instead of searching the hash table.
|
|
|
|
|
|
|
|
const auto it = name_map.find(name);
|
|
|
|
return name_map.end() == it ? UNKNOWN_FIELD : it->second;
|
|
|
|
}
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-03-25 20:12:56 +00:00
|
|
|
/** Read the field name in JSON format.
|
|
|
|
* A reference to the field name will be written to ref.
|
|
|
|
* You can also use temporary `tmp` buffer to copy field name there.
|
2016-02-18 11:44:50 +00:00
|
|
|
*/
|
|
|
|
static StringRef readName(ReadBuffer & buf, String & tmp)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (buf.position() + 1 < buf.buffer().end())
|
|
|
|
{
|
|
|
|
const char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end());
|
|
|
|
|
|
|
|
if (next_pos != buf.buffer().end() && *next_pos != '\\')
|
|
|
|
{
|
|
|
|
/// The most likely option is that there is no escape sequence in the key name, and the entire name is placed in the buffer.
|
|
|
|
assertChar('"', buf);
|
|
|
|
StringRef res(buf.position(), next_pos - buf.position());
|
|
|
|
buf.position() += next_pos - buf.position();
|
|
|
|
assertChar('"', buf);
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
readJSONString(tmp, buf);
|
|
|
|
return tmp;
|
2016-02-18 11:44:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-09-20 19:11:25 +00:00
|
|
|
static void skipColonDelimeter(ReadBuffer & istr)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
skipWhitespaceIfAny(istr);
|
|
|
|
assertChar(':', istr);
|
|
|
|
skipWhitespaceIfAny(istr);
|
2016-09-20 19:11:25 +00:00
|
|
|
}
|
|
|
|
|
2018-09-14 07:28:49 +00:00
|
|
|
void JSONEachRowRowInputStream::skipUnknownField(const StringRef& name_ref)
|
|
|
|
{
|
|
|
|
if (!format_settings.skip_unknown_fields)
|
|
|
|
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
|
|
|
|
|
|
|
|
skipJSONField(istr, name_ref);
|
|
|
|
}
|
|
|
|
|
|
|
|
void JSONEachRowRowInputStream::readField(size_t index, MutableColumns & columns)
|
|
|
|
{
|
|
|
|
if (read_columns[index])
|
|
|
|
throw Exception("Duplicate field found while parsing JSONEachRow format: " + column_name(index), ErrorCodes::INCORRECT_DATA);
|
|
|
|
|
|
|
|
read_columns[index] = true;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
header.getByPosition(index).type->deserializeTextJSON(*columns[index], istr, format_settings);
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
e.addMessage("(while read the value of key " + column_name(index) + ")");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2016-09-20 19:11:25 +00:00
|
|
|
|
2018-09-14 08:27:49 +00:00
|
|
|
bool JSONEachRowRowInputStream::advance_to_next_key(size_t key_index)
|
|
|
|
{
|
|
|
|
skipWhitespaceIfAny(istr);
|
|
|
|
|
|
|
|
if (istr.eof())
|
|
|
|
throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
|
|
|
|
else if (*istr.position() == '}')
|
|
|
|
{
|
|
|
|
++istr.position();
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (key_index > 0)
|
|
|
|
{
|
|
|
|
assertChar(',', istr);
|
|
|
|
skipWhitespaceIfAny(istr);
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2017-12-14 20:58:18 +00:00
|
|
|
bool JSONEachRowRowInputStream::read(MutableColumns & columns)
|
2016-02-18 11:44:50 +00:00
|
|
|
{
|
2017-10-04 00:08:38 +00:00
|
|
|
skipWhitespaceIfAny(istr);
|
2017-12-14 20:58:18 +00:00
|
|
|
|
2017-10-23 17:43:46 +00:00
|
|
|
/// We consume ;, or \n before scanning a new row, instead scanning to next row at the end.
|
2017-12-14 20:58:18 +00:00
|
|
|
/// The reason is that if we want an exact number of rows read with LIMIT x
|
2017-10-23 17:43:46 +00:00
|
|
|
/// from a streaming table engine with text data format, like File or Kafka
|
|
|
|
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.
|
2017-12-14 20:58:18 +00:00
|
|
|
|
2017-10-23 17:43:46 +00:00
|
|
|
/// Semicolon is added for convenience as it could be used at end of INSERT query.
|
|
|
|
if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';'))
|
2017-10-04 00:08:38 +00:00
|
|
|
++istr.position();
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
skipWhitespaceIfAny(istr);
|
|
|
|
if (istr.eof())
|
|
|
|
return false;
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
assertChar('{', istr);
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-12-14 20:58:18 +00:00
|
|
|
size_t num_columns = columns.size();
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Set of columns for which the values were read. The rest will be filled with default values.
|
|
|
|
/// TODO Ability to provide your DEFAULTs.
|
2018-09-14 07:28:49 +00:00
|
|
|
read_columns.assign(num_columns, false);
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2018-09-14 08:27:49 +00:00
|
|
|
for ( size_t key_index = 0 ; advance_to_next_key(key_index) ; ++key_index )
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
StringRef name_ref = readName(istr, name_buf);
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
skipColonDelimeter(istr);
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2018-09-14 08:27:49 +00:00
|
|
|
const size_t column_index = get_column_index(name_ref);
|
|
|
|
if ( column_index == UNKNOWN_FIELD )
|
|
|
|
skipUnknownField(name_ref);
|
|
|
|
else
|
|
|
|
readField(column_index, columns);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Fill non-visited columns with the default values.
|
2017-12-14 20:58:18 +00:00
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!read_columns[i])
|
2017-12-14 20:58:18 +00:00
|
|
|
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
|
2016-02-18 11:44:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return true;
|
2016-02-18 11:44:50 +00:00
|
|
|
}
|
|
|
|
|
2017-01-27 04:29:47 +00:00
|
|
|
|
|
|
|
void JSONEachRowRowInputStream::syncAfterError()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
skipToUnescapedNextLineOrEOF(istr);
|
2017-01-27 04:29:47 +00:00
|
|
|
}
|
|
|
|
|
2018-06-10 19:22:49 +00:00
|
|
|
|
|
|
|
void registerInputFormatJSONEachRow(FormatFactory & factory)
|
|
|
|
{
|
|
|
|
factory.registerInputFormat("JSONEachRow", [](
|
|
|
|
ReadBuffer & buf,
|
|
|
|
const Block & sample,
|
|
|
|
const Context &,
|
|
|
|
size_t max_block_size,
|
|
|
|
const FormatSettings & settings)
|
|
|
|
{
|
|
|
|
return std::make_shared<BlockInputStreamFromRowInputStream>(
|
|
|
|
std::make_shared<JSONEachRowRowInputStream>(buf, sample, settings),
|
|
|
|
sample, max_block_size, settings);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-02-18 11:44:50 +00:00
|
|
|
}
|