JSONCompactEachRow and JSONCompactEachRowWithNamesAndTypes (#7841)

* Research commit

* Created Output EachRow Format

* Fixed bugs

* Created output format JSONCompactEachRowWithNamesAndTypes without totals

* Fixed bugs

* Fixed bugs

* Totals for JSONCompactEachRowWithNamesAndTypes

* Deleted needless debug

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Working commit

* Tests added

* Input Format for JSONCompactEachRow

* Fixed bugs for JSONCompactEachRow Input format

* Fixed bugs for JSONCompactEachRowRowInputFormat.cpp

* JSONCompactEachRow and JSONCompactEachRowWithNamesAndTypes united

* Created input format for both formats

* fixed bugs

* fixed bugs

* Working commit

* Working commit

* Working commit

* Working commit

* tests

* Working commit

* Final tests

* Performance tests added
This commit is contained in:
Mikhail Korotov 2019-12-05 13:13:40 +03:00 committed by tavplubix
parent 529293faad
commit 88e37020e0
9 changed files with 570 additions and 0 deletions

View File

@ -281,6 +281,8 @@ void registerInputFormatProcessorTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
@ -336,6 +338,8 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorTSKV(*this);
registerInputFormatProcessorJSONEachRow(*this);
registerOutputFormatProcessorJSONEachRow(*this);
registerInputFormatProcessorJSONCompactEachRow(*this);
registerOutputFormatProcessorJSONCompactEachRow(*this);
registerInputFormatProcessorProtobuf(*this);
registerOutputFormatProcessorProtobuf(*this);
registerInputFormatProcessorCapnProto(*this);

View File

@ -0,0 +1,238 @@
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
extern const int LOGICAL_ERROR;
}
JSONCompactEachRowRowInputFormat::JSONCompactEachRowRowInputFormat(ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_,
bool with_names_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_)
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
}
void JSONCompactEachRowRowInputFormat::readPrefix()
{
if (with_names)
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, false);
assertChar('[', in);
do
{
skipWhitespaceIfAny(in);
String column_name;
readJSONString(column_name, in);
addInputColumn(column_name);
skipWhitespaceIfAny(in);
}
while (checkChar(',', in));
assertChar(']', in);
skipEndOfLine();
/// Type checking
assertChar('[', in);
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
skipWhitespaceIfAny(in);
String data_type;
readJSONString(data_type, in);
if (column_indexes_for_input_fields[i] &&
data_types[*column_indexes_for_input_fields[i]]->getName() != data_type)
{
throw Exception(
"Type of '" + getPort().getHeader().getByPosition(*column_indexes_for_input_fields[i]).name
+ "' must be " + data_types[*column_indexes_for_input_fields[i]]->getName() +
", not " + data_type,
ErrorCodes::INCORRECT_DATA
);
}
if (i != column_indexes_for_input_fields.size() - 1)
assertChar(',', in);
skipWhitespaceIfAny(in);
}
assertChar(']', in);
}
else
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, true);
column_indexes_for_input_fields.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
column_indexes_for_input_fields[i] = i;
}
}
for (size_t i = 0; i < read_columns.size(); ++i)
{
if (!read_columns[i])
{
not_seen_columns.emplace_back(i);
}
}
}
void JSONCompactEachRowRowInputFormat::addInputColumn(const String & column_name)
{
names_of_columns.emplace_back(column_name);
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
return;
}
throw Exception(
"Unknown field found in JSONCompactEachRow header: '" + column_name + "' " +
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing JSONCompactEachRow header: " + column_name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::RowReadExtension &ext)
{
skipEndOfLine();
if (in.eof())
return false;
size_t num_columns = columns.size();
read_columns.assign(num_columns, false);
assertChar('[', in);
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & table_column = column_indexes_for_input_fields[file_column];
if (table_column)
{
readField(*table_column, columns);
}
else
{
skipJSONField(in, StringRef(names_of_columns[file_column]));
}
skipWhitespaceIfAny(in);
if (in.eof())
throw Exception("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
if (file_column + 1 != column_indexes_for_input_fields.size())
{
assertChar(',', in);
skipWhitespaceIfAny(in);
}
}
assertChar(']', in);
for (size_t i = 0; i < not_seen_columns.size(); i++)
{
columns[not_seen_columns[i]]->insertDefault();
}
ext.read_columns = read_columns;
return true;
}
void JSONCompactEachRowRowInputFormat::skipEndOfLine()
{
skipWhitespaceIfAny(in);
if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
++in.position();
skipWhitespaceIfAny(in);
}
void JSONCompactEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
{
try
{
read_columns[index] = true;
const auto & type = data_types[index];
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type);
else
type->deserializeAsTextJSON(*columns[index], in, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while read the value of key " + getPort().getHeader().getByPosition(index).name + ")");
throw;
}
}
void JSONCompactEachRowRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
}
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
{
factory.registerInputFormatProcessor("JSONCompactEachRow", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
factory.registerInputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
ReadBuffer & buf,
const Block & sample,
const Context &,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONCompactEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
});
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
class ReadBuffer;
/** A stream for reading data in JSONCompactEachRow and JSONCompactEachRowWithNamesAndTypes formats
*/
class JSONCompactEachRowRowInputFormat : public IRowInputFormat
{
public:
JSONCompactEachRowRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_, bool with_names_);
String getName() const override { return "JSONCompactEachRowRowInputFormat"; }
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
private:
void addInputColumn(const String & column_name);
void skipEndOfLine();
void readField(size_t index, MutableColumns & columns);
const FormatSettings format_settings;
using IndexesMap = std::unordered_map<String, size_t>;
IndexesMap column_indexes_by_names;
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
DataTypes data_types;
std::vector<UInt8> read_columns;
std::vector<size_t> not_seen_columns;
/// This is for the correct exceptions in skipping unknown fields.
std::vector<String> names_of_columns;
bool with_names;
};
}

View File

@ -0,0 +1,116 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool with_names_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_)
{
auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
fields.assign(columns.begin(), columns.end());
}
void JSONCompactEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
type.serializeAsTextJSON(column, row_num, out, settings);
}
void JSONCompactEachRowRowOutputFormat::writeFieldDelimiter()
{
writeCString(", ", out);
}
void JSONCompactEachRowRowOutputFormat::writeRowStartDelimiter()
{
writeChar('[', out);
}
void JSONCompactEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("]\n", out);
}
void JSONCompactEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeChar('\n', out);
size_t num_columns = columns.size();
writeChar('[', out);
for (size_t i = 0; i < num_columns; ++i)
{
if (i != 0)
JSONCompactEachRowRowOutputFormat::writeFieldDelimiter();
JSONCompactEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num);
}
writeCString("]\n", out);
}
void JSONCompactEachRowRowOutputFormat::writePrefix()
{
if (with_names)
{
writeChar('[', out);
for (size_t i = 0; i < fields.size(); ++i)
{
writeChar('\"', out);
writeString(fields[i].name, out);
writeChar('\"', out);
if (i != fields.size() - 1)
writeCString(", ", out);
}
writeCString("]\n[", out);
for (size_t i = 0; i < fields.size(); ++i)
{
writeJSONString(fields[i].type->getName(), out, settings);
if (i != fields.size() - 1)
writeCString(", ", out);
}
writeCString("]\n", out);
}
}
void JSONCompactEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
{
if (with_names)
IRowOutputFormat::consumeTotals(std::move(chunk));
}
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONCompactEachRow", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
});
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSettings.h>
namespace DB
{
/** The stream for outputting data in JSON format, by object per line.
* Does not validate UTF-8.
*/
class JSONCompactEachRowRowOutputFormat : public IRowOutputFormat
{
public:
JSONCompactEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_, bool with_names);
String getName() const override { return "JSONCompactEachRowRowOutputFormat"; }
void writePrefix() override;
void writeBeforeTotals() override {}
void writeTotals(const Columns & columns, size_t row_num) override;
void writeAfterTotals() override {}
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
protected:
void consumeTotals(Chunk) override;
/// No extremes.
void consumeExtremes(Chunk) override {}
private:
FormatSettings settings;
NamesAndTypes fields;
bool with_names;
};
}

View File

@ -32,6 +32,8 @@
<value>CSVWithNames</value>
<value>Values</value>
<value>JSONEachRow</value>
<value>JSONCompactEachRow</value>
<value>JSONCompactEachRowWithNamesAndTypes</value>
<value>TSKV</value>
<value>RowBinary</value>
<value>Native</value>

View File

@ -34,6 +34,7 @@
<value>JSON</value>
<value>JSONCompact</value>
<value>JSONEachRow</value>
<value>JSONCompactEachRow</value>
<value>TSKV</value>
<value>Pretty</value>
<value>PrettyCompact</value>

View File

@ -0,0 +1,47 @@
1
[1, "a"]
[2, "b"]
[3, "c"]
2
["a", "1"]
["b", "1"]
["c", "1"]
3
["value", "name"]
["UInt8", "String"]
[1, "a"]
[2, "b"]
[3, "c"]
4
["name", "c"]
["String", "UInt64"]
["a", "1"]
["b", "1"]
["c", "1"]
["", "3"]
5
["first", 1, 2, 0]
["second", 2, 0, 6]
6
["first", 1, 2, 8]
["second", 2, 32, 6]
7
[16, [15,16,0], ["first","second","third"]]
8
["first", 1, 2, 0]
["second", 2, 0, 6]
9
["first", 1, 2, 8]
["second", 2, 32, 6]
10
["first", 1, 16, 8]
["second", 2, 32, 8]
11
["v1", "v2", "v3", "v4"]
["String", "UInt8", "UInt16", "UInt8"]
["", 2, 3, 1]
12
["v1", "n.id", "n.name"]
["UInt8", "Array(UInt8)", "Array(String)"]
[16, [15,16,0], ["first","second","third"]]

View File

@ -0,0 +1,63 @@
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;
SELECT 1;
/* Check JSONCompactEachRow Output */
CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value;
INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c');
SELECT * FROM test_table FORMAT JSONCompactEachRow;
SELECT 2;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactEachRow;
SELECT 3;
/* Check JSONCompactEachRowWithNamesAndTypes Output */
SELECT * FROM test_table FORMAT JSONCompactEachRowWithNamesAndTypes;
SELECT 4;
/* Check Totals */
SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactEachRowWithNamesAndTypes;
DROP TABLE IF EXISTS test_table;
SELECT 5;
/* Check JSONCompactEachRow Input */
CREATE TABLE test_table (v1 String, v2 UInt8, v3 DEFAULT v2 * 16, v4 UInt8 DEFAULT 8) ENGINE = MergeTree() ORDER BY v2;
INSERT INTO test_table FORMAT JSONCompactEachRow ["first", 1, "2", null] ["second", 2, null, 6];
SELECT * FROM test_table FORMAT JSONCompactEachRow;
TRUNCATE TABLE test_table;
SELECT 6;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONCompactEachRow ["first", 1, "2", null] ["second", 2, null, 6];
SELECT * FROM test_table FORMAT JSONCompactEachRow;
TRUNCATE TABLE test_table;
SELECT 7;
/* Check Nested */
CREATE TABLE test_table_2 (v1 UInt8, n Nested(id UInt8, name String)) ENGINE = MergeTree() ORDER BY v1;
INSERT INTO test_table_2 FORMAT JSONCompactEachRow [16, [15, 16, null], ["first", "second", "third"]];
SELECT * FROM test_table_2 FORMAT JSONCompactEachRow;
TRUNCATE TABLE test_table_2;
SELECT 8;
/* Check JSONCompactEachRowWithNamesAndTypes Output */
SET input_format_null_as_default = 0;
INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", 1, "2", null]["second", 2, null, 6];
SELECT * FROM test_table FORMAT JSONCompactEachRow;
TRUNCATE TABLE test_table;
SELECT 9;
/* Check input_format_null_as_default = 1 */
SET input_format_null_as_default = 1;
INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "v2", "v3", "v4"]["String","UInt8","UInt16","UInt8"]["first", 1, "2", null] ["second", 2, null, 6];
SELECT * FROM test_table FORMAT JSONCompactEachRow;
SELECT 10;
/* Check Header */
TRUNCATE TABLE test_table;
SET input_format_skip_unknown_fields = 1;
INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "v2", "invalid_column"]["String", "UInt8", "UInt8"]["first", 1, 32]["second", 2, "64"];
SELECT * FROM test_table FORMAT JSONCompactEachRow;
SELECT 11;
TRUNCATE TABLE test_table;
INSERT INTO test_table FORMAT JSONCompactEachRowWithNamesAndTypes ["v4", "v2", "v3"]["UInt8", "UInt8", "UInt16"][1, 2, 3]
SELECT * FROM test_table FORMAT JSONCompactEachRowWithNamesAndTypes;
SELECT 12;
/* Check Nested */
INSERT INTO test_table_2 FORMAT JSONCompactEachRowWithNamesAndTypes ["v1", "n.id", "n.name"]["UInt8", "Array(UInt8)", "Array(String)"][16, [15, 16, null], ["first", "second", "third"]];
SELECT * FROM test_table_2 FORMAT JSONCompactEachRowWithNamesAndTypes;
DROP TABLE IF EXISTS test_table;
DROP TABLE IF EXISTS test_table_2;