mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
Merge branch 'veloman-yunkan-jsoneachrow_import_nested'
This commit is contained in:
commit
599ec4c577
@ -41,6 +41,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
|
||||
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
|
||||
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
|
||||
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
|
||||
format_settings.import_nested_json = settings.input_format_import_nested_json;
|
||||
format_settings.date_time_input_format = settings.date_time_input_format;
|
||||
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
|
||||
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
|
||||
|
@ -49,6 +49,7 @@ struct FormatSettings
|
||||
|
||||
bool skip_unknown_fields = false;
|
||||
bool write_statistics = true;
|
||||
bool import_nested_json = false;
|
||||
|
||||
enum class DateTimeInputFormat
|
||||
{
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Formats/JSONEachRowRowInputStream.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Formats/BlockInputStreamFromRowInputStream.h>
|
||||
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,6 +14,17 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
enum
|
||||
{
|
||||
UNKNOWN_FIELD = size_t(-1),
|
||||
NESTED_FIELD = size_t(-2)
|
||||
};
|
||||
|
||||
} // unnamed namespace
|
||||
|
||||
|
||||
JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings)
|
||||
: istr(istr_), header(header_), format_settings(format_settings), name_map(header.columns())
|
||||
@ -23,17 +34,42 @@ JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const B
|
||||
|
||||
size_t num_columns = header.columns();
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
name_map[header.safeGetByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
|
||||
{
|
||||
const String& colname = columnName(i);
|
||||
name_map[colname] = i; /// NOTE You could place names more cache-locally.
|
||||
if (format_settings.import_nested_json)
|
||||
{
|
||||
const auto splitted = Nested::splitName(colname);
|
||||
if (!splitted.second.empty())
|
||||
{
|
||||
const StringRef table_name(colname.data(), splitted.first.size());
|
||||
name_map[table_name] = NESTED_FIELD;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Read the field name in JSON format.
|
||||
* A reference to the field name will be written to ref.
|
||||
* You can also use temporary `tmp` buffer to copy field name there.
|
||||
*/
|
||||
static StringRef readName(ReadBuffer & buf, String & tmp)
|
||||
const String& JSONEachRowRowInputStream::columnName(size_t i) const
|
||||
{
|
||||
if (buf.position() + 1 < buf.buffer().end())
|
||||
return header.safeGetByPosition(i).name;
|
||||
}
|
||||
|
||||
size_t JSONEachRowRowInputStream::columnIndex(const StringRef& name) const
|
||||
{
|
||||
/// NOTE Optimization is possible by caching the order of fields (which is almost always the same)
|
||||
/// and a quick check to match the next expected field, instead of searching the hash table.
|
||||
|
||||
const auto it = name_map.find(name);
|
||||
return name_map.end() == it ? UNKNOWN_FIELD : it->second;
|
||||
}
|
||||
|
||||
/** Read the field name and convert it to column name
|
||||
* (taking into account the current nested name prefix)
|
||||
*/
|
||||
StringRef JSONEachRowRowInputStream::readColumnName(ReadBuffer & buf)
|
||||
{
|
||||
// This is just an optimization: try to avoid copying the name into current_column_name
|
||||
if (nested_prefix_length == 0 && buf.position() + 1 < buf.buffer().end())
|
||||
{
|
||||
const char * next_pos = find_first_symbols<'\\', '"'>(buf.position() + 1, buf.buffer().end());
|
||||
|
||||
@ -48,8 +84,9 @@ static StringRef readName(ReadBuffer & buf, String & tmp)
|
||||
}
|
||||
}
|
||||
|
||||
readJSONString(tmp, buf);
|
||||
return tmp;
|
||||
current_column_name.resize(nested_prefix_length);
|
||||
readJSONStringInto(current_column_name, buf);
|
||||
return current_column_name;
|
||||
}
|
||||
|
||||
|
||||
@ -60,6 +97,80 @@ static void skipColonDelimeter(ReadBuffer & istr)
|
||||
skipWhitespaceIfAny(istr);
|
||||
}
|
||||
|
||||
void JSONEachRowRowInputStream::skipUnknownField(const StringRef & name_ref)
|
||||
{
|
||||
if (!format_settings.skip_unknown_fields)
|
||||
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
|
||||
|
||||
skipJSONField(istr, name_ref);
|
||||
}
|
||||
|
||||
void JSONEachRowRowInputStream::readField(size_t index, MutableColumns & columns)
|
||||
{
|
||||
if (read_columns[index])
|
||||
throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA);
|
||||
|
||||
try
|
||||
{
|
||||
header.getByPosition(index).type->deserializeTextJSON(*columns[index], istr, format_settings);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("(while read the value of key " + columnName(index) + ")");
|
||||
throw;
|
||||
}
|
||||
|
||||
read_columns[index] = true;
|
||||
}
|
||||
|
||||
bool JSONEachRowRowInputStream::advanceToNextKey(size_t key_index)
|
||||
{
|
||||
skipWhitespaceIfAny(istr);
|
||||
|
||||
if (istr.eof())
|
||||
throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
else if (*istr.position() == '}')
|
||||
{
|
||||
++istr.position();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (key_index > 0)
|
||||
{
|
||||
assertChar(',', istr);
|
||||
skipWhitespaceIfAny(istr);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void JSONEachRowRowInputStream::readJSONObject(MutableColumns & columns)
|
||||
{
|
||||
assertChar('{', istr);
|
||||
|
||||
for (size_t key_index = 0; advanceToNextKey(key_index); ++key_index)
|
||||
{
|
||||
StringRef name_ref = readColumnName(istr);
|
||||
|
||||
skipColonDelimeter(istr);
|
||||
|
||||
const size_t column_index = columnIndex(name_ref);
|
||||
if (column_index == UNKNOWN_FIELD)
|
||||
skipUnknownField(name_ref);
|
||||
else if (column_index == NESTED_FIELD)
|
||||
readNestedData(name_ref.toString(), columns);
|
||||
else
|
||||
readField(column_index, columns);
|
||||
}
|
||||
}
|
||||
|
||||
void JSONEachRowRowInputStream::readNestedData(const String & name, MutableColumns & columns)
|
||||
{
|
||||
current_column_name = name;
|
||||
current_column_name.push_back('.');
|
||||
nested_prefix_length = current_column_name.size();
|
||||
readJSONObject(columns);
|
||||
nested_prefix_length = 0;
|
||||
}
|
||||
|
||||
bool JSONEachRowRowInputStream::read(MutableColumns & columns)
|
||||
{
|
||||
@ -78,71 +189,14 @@ bool JSONEachRowRowInputStream::read(MutableColumns & columns)
|
||||
if (istr.eof())
|
||||
return false;
|
||||
|
||||
assertChar('{', istr);
|
||||
|
||||
size_t num_columns = columns.size();
|
||||
|
||||
/// Set of columns for which the values were read. The rest will be filled with default values.
|
||||
/// TODO Ability to provide your DEFAULTs.
|
||||
bool read_columns[num_columns];
|
||||
memset(read_columns, 0, num_columns);
|
||||
read_columns.assign(num_columns, false);
|
||||
|
||||
bool first = true;
|
||||
while (true)
|
||||
{
|
||||
skipWhitespaceIfAny(istr);
|
||||
|
||||
if (istr.eof())
|
||||
throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
else if (*istr.position() == '}')
|
||||
{
|
||||
++istr.position();
|
||||
break;
|
||||
}
|
||||
|
||||
if (first)
|
||||
first = false;
|
||||
else
|
||||
{
|
||||
assertChar(',', istr);
|
||||
skipWhitespaceIfAny(istr);
|
||||
}
|
||||
|
||||
StringRef name_ref = readName(istr, name_buf);
|
||||
|
||||
/// NOTE Optimization is possible by caching the order of fields (which is almost always the same)
|
||||
/// and a quick check to match the next expected field, instead of searching the hash table.
|
||||
|
||||
auto it = name_map.find(name_ref);
|
||||
if (name_map.end() == it)
|
||||
{
|
||||
if (!format_settings.skip_unknown_fields)
|
||||
throw Exception("Unknown field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
|
||||
|
||||
skipColonDelimeter(istr);
|
||||
skipJSONField(istr, name_ref);
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t index = it->second;
|
||||
|
||||
if (read_columns[index])
|
||||
throw Exception("Duplicate field found while parsing JSONEachRow format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
|
||||
|
||||
skipColonDelimeter(istr);
|
||||
|
||||
read_columns[index] = true;
|
||||
|
||||
try
|
||||
{
|
||||
header.getByPosition(index).type->deserializeTextJSON(*columns[index], istr, format_settings);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("(while read the value of key " + name_ref.toString() + ")");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
nested_prefix_length = 0;
|
||||
readJSONObject(columns);
|
||||
|
||||
/// Fill non-visited columns with the default values.
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
|
@ -26,6 +26,16 @@ public:
|
||||
bool allowSyncAfterError() const override { return true; }
|
||||
void syncAfterError() override;
|
||||
|
||||
private:
|
||||
const String & columnName(size_t i) const;
|
||||
size_t columnIndex(const StringRef & name) const;
|
||||
bool advanceToNextKey(size_t key_index);
|
||||
void skipUnknownField(const StringRef & name_ref);
|
||||
StringRef readColumnName(ReadBuffer & buf);
|
||||
void readField(size_t index, MutableColumns & columns);
|
||||
void readJSONObject(MutableColumns & columns);
|
||||
void readNestedData(const String & name, MutableColumns & columns);
|
||||
|
||||
private:
|
||||
ReadBuffer & istr;
|
||||
Block header;
|
||||
@ -33,7 +43,19 @@ private:
|
||||
const FormatSettings format_settings;
|
||||
|
||||
/// Buffer for the read from the stream field name. Used when you have to copy it.
|
||||
String name_buf;
|
||||
/// Also, if processing of Nested data is in progress, it holds the common prefix
|
||||
/// of the nested column names (so that appending the field name to it produces
|
||||
/// the full column name)
|
||||
String current_column_name;
|
||||
|
||||
/// If processing Nested data, holds the length of the common prefix
|
||||
/// of the names of related nested columns. For example, for a table
|
||||
/// created as follows
|
||||
/// CREATE TABLE t (n Nested (i Int32, s String))
|
||||
/// the nested column names are 'n.i' and 'n.s' and the nested prefix is 'n.'
|
||||
size_t nested_prefix_length = 0;
|
||||
|
||||
std::vector<UInt8> read_columns;
|
||||
|
||||
/// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
|
||||
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
|
||||
|
@ -667,6 +667,7 @@ void readJSONString(String & s, ReadBuffer & buf)
|
||||
template void readJSONStringInto<PaddedPODArray<UInt8>, void>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
|
||||
template bool readJSONStringInto<PaddedPODArray<UInt8>, bool>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
|
||||
template void readJSONStringInto<NullSink>(NullSink & s, ReadBuffer & buf);
|
||||
template void readJSONStringInto<String>(String & s, ReadBuffer & buf);
|
||||
|
||||
|
||||
template <typename ReturnType>
|
||||
|
@ -148,6 +148,7 @@ struct Settings
|
||||
M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \
|
||||
\
|
||||
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats).") \
|
||||
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
|
||||
\
|
||||
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
|
||||
\
|
||||
|
@ -0,0 +1,15 @@
|
||||
1 ok ['abc','def'] [1,23]
|
||||
0 [] []
|
||||
0 ['x','y','z'] [45,67,8]
|
||||
1 ok ['dog','cat','pig'] [3,3,3]
|
||||
1 ok ['zero','negative one'] [0,-1]
|
||||
1 ok [] []
|
||||
0 [] []
|
||||
0 [] []
|
||||
1 ok [] []
|
||||
1 ok [] []
|
||||
1 ok ['abc','def'] [1,23]
|
||||
0 [] []
|
||||
0 ['x','y','z'] [45,67,8]
|
||||
1 ok ['dog','cat','pig'] [3,3,3]
|
||||
1 ok ['zero','negative one'] [0,-1]
|
38
dbms/tests/queries/0_stateless/00715_json_each_row_input_nested.sh
Executable file
38
dbms/tests/queries/0_stateless/00715_json_each_row_input_nested.sh
Executable file
@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.json_each_row_nested"
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE test.json_each_row_nested (d1 UInt8, d2 String, n Nested (s String, i Int32) ) ENGINE = Memory"
|
||||
|
||||
echo '{"d1" : 1, "d2" : "ok", "n.s" : ["abc", "def"], "n.i" : [1, 23]}
|
||||
{ }
|
||||
{"t1" : 0, "n.t2":true,"n.i":[45, 67, 8], "n.s":["x", "y", "z"],"t5":[],"t6":"trash" }
|
||||
{"d2":"ok","n.s":["dog", "cat", "pig"], "n.x":[["1","2"]], "d1":"1", "n.i":[3, 3, 3]}
|
||||
{"t0" : -0.1, "n.s" : ["zero","negative one"], "a.b" : 0, "n.i" : [0, -1], "d2" : "ok", "d1" : 1}' \
|
||||
| $CLICKHOUSE_CLIENT --input_format_skip_unknown_fields=1 -q "INSERT INTO test.json_each_row_nested FORMAT JSONEachRow"
|
||||
|
||||
$CLICKHOUSE_CLIENT --max_threads=1 -q "SELECT * FROM test.json_each_row_nested"
|
||||
|
||||
test_nested_json()
|
||||
{
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.json_each_row_nested"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE test.json_each_row_nested (d1 UInt8, d2 String, n Nested (s String, i Int32) ) ENGINE = Memory"
|
||||
|
||||
echo '{"d1" : 1, "d2" : "ok", "n" : { "s" : ["abc", "def"], "i" : [1, 23]} }
|
||||
{ }
|
||||
{"t1" : 0, "n.t2":true,"n" : {"i":[45, 67, 8], "s":["x", "y", "z"]}, "t5":[],"t6":"trash" }
|
||||
{"d2":"ok","n" : {"s":["dog", "cat", "pig"], "x":[["1","2"]], "i":[3, 3, 3]}, "d1":"1", "n.j":[4, 4, 4]}
|
||||
{"t0" : -0.1, "n": {"s" : ["zero","negative one"], "i" : [0, -1]}, "d2" : "ok", "d1" : 1}' \
|
||||
| $CLICKHOUSE_CLIENT "$@" --input_format_skip_unknown_fields=1 -q "INSERT INTO test.json_each_row_nested FORMAT JSONEachRow"
|
||||
|
||||
$CLICKHOUSE_CLIENT --max_threads=1 -q "SELECT * FROM test.json_each_row_nested"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.json_each_row_nested"
|
||||
}
|
||||
|
||||
test_nested_json
|
||||
test_nested_json --input_format_import_nested_json=1
|
Loading…
Reference in New Issue
Block a user