ClickHouse/src/Processors/Formats/Impl/JSONRowOutputFormat.cpp

303 lines
7.7 KiB
C++
Raw Normal View History

2019-02-19 18:41:18 +00:00
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
2020-09-02 04:05:02 +00:00
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const RowOutputFormatParams & params_,
2020-09-02 04:05:02 +00:00
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header, out_, params_), settings(settings_), yield_strings(yield_strings_)
2019-02-19 18:41:18 +00:00
{
2020-04-22 06:34:20 +00:00
const auto & sample = getPort(PortKind::Main).getHeader();
2019-02-19 18:41:18 +00:00
NamesAndTypesList columns(sample.getNamesAndTypesList());
fields.assign(columns.begin(), columns.end());
bool need_validate_utf8 = false;
for (size_t i = 0; i < sample.columns(); ++i)
{
if (!sample.getByPosition(i).type->textCanContainOnlyValidUTF8())
need_validate_utf8 = true;
WriteBufferFromOwnString buf;
{
WriteBufferValidUTF8 validating_buf(buf);
writeJSONString(fields[i].name, validating_buf, settings);
}
2019-02-19 18:41:18 +00:00
fields[i].name = buf.str();
}
if (need_validate_utf8)
{
validating_ostr = std::make_unique<WriteBufferValidUTF8>(out);
ostr = validating_ostr.get();
}
else
ostr = &out;
}
void JSONRowOutputFormat::writePrefix()
{
writeCString("{\n", *ostr);
writeCString("\t\"meta\":\n", *ostr);
writeCString("\t[\n", *ostr);
for (size_t i = 0; i < fields.size(); ++i)
{
writeCString("\t\t{\n", *ostr);
writeCString("\t\t\t\"name\": ", *ostr);
writeString(fields[i].name, *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\t\"type\": ", *ostr);
writeJSONString(fields[i].type->getName(), *ostr, settings);
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
if (i + 1 < fields.size())
writeChar(',', *ostr);
writeChar('\n', *ostr);
}
writeCString("\t],\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"data\":\n", *ostr);
writeCString("\t[\n", *ostr);
}
2021-03-09 14:46:52 +00:00
void JSONRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
2019-02-19 18:41:18 +00:00
{
writeCString("\t\t\t", *ostr);
writeString(fields[field_number].name, *ostr);
writeCString(": ", *ostr);
2020-09-02 04:05:02 +00:00
if (yield_strings)
{
WriteBufferFromOwnString buf;
2021-03-09 14:46:52 +00:00
serialization.serializeText(column, row_num, buf, settings);
2020-09-02 04:05:02 +00:00
writeJSONString(buf.str(), *ostr, settings);
}
else
2021-03-09 14:46:52 +00:00
serialization.serializeTextJSON(column, row_num, *ostr, settings);
2020-09-02 04:05:02 +00:00
2019-02-19 18:41:18 +00:00
++field_number;
}
2021-03-09 14:46:52 +00:00
void JSONRowOutputFormat::writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num)
2019-02-19 18:41:18 +00:00
{
writeCString("\t\t", *ostr);
2019-04-12 15:43:09 +00:00
writeString(fields[field_number].name, *ostr);
2019-02-19 18:41:18 +00:00
writeCString(": ", *ostr);
2020-09-02 04:05:02 +00:00
if (yield_strings)
{
WriteBufferFromOwnString buf;
2021-03-09 14:46:52 +00:00
serialization.serializeText(column, row_num, buf, settings);
2020-09-02 04:05:02 +00:00
writeJSONString(buf.str(), *ostr, settings);
}
else
2021-03-09 14:46:52 +00:00
serialization.serializeTextJSON(column, row_num, *ostr, settings);
2020-09-02 04:05:02 +00:00
2019-02-19 18:41:18 +00:00
++field_number;
}
void JSONRowOutputFormat::writeFieldDelimiter()
{
writeCString(",\n", *ostr);
}
void JSONRowOutputFormat::writeRowStartDelimiter()
{
writeCString("\t\t{\n", *ostr);
}
void JSONRowOutputFormat::writeRowEndDelimiter()
{
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
field_number = 0;
++row_count;
}
void JSONRowOutputFormat::writeRowBetweenDelimiter()
{
writeCString(",\n", *ostr);
}
void JSONRowOutputFormat::writeSuffix()
{
writeChar('\n', *ostr);
writeCString("\t]", *ostr);
}
void JSONRowOutputFormat::writeBeforeTotals()
{
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"totals\":\n", *ostr);
writeCString("\t{\n", *ostr);
}
void JSONRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
{
if (i != 0)
2019-04-12 15:59:51 +00:00
writeTotalsFieldDelimiter();
2019-02-19 18:41:18 +00:00
2021-03-09 14:46:52 +00:00
writeTotalsField(*columns[i], *serializations[i], row_num);
2019-02-19 18:41:18 +00:00
}
}
void JSONRowOutputFormat::writeAfterTotals()
{
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
field_number = 0;
}
void JSONRowOutputFormat::writeBeforeExtremes()
{
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"extremes\":\n", *ostr);
writeCString("\t{\n", *ostr);
}
void JSONRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
writeCString("\t\t\"", *ostr);
writeCString(title, *ostr);
writeCString("\":\n", *ostr);
writeCString("\t\t{\n", *ostr);
size_t extremes_columns = columns.size();
for (size_t i = 0; i < extremes_columns; ++i)
{
if (i != 0)
2019-04-12 16:06:38 +00:00
writeFieldDelimiter();
2019-02-19 18:41:18 +00:00
2021-03-09 14:46:52 +00:00
writeField(*columns[i], *serializations[i], row_num);
2019-02-19 18:41:18 +00:00
}
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
field_number = 0;
}
void JSONRowOutputFormat::writeMinExtreme(const Columns & columns, size_t row_num)
{
writeExtremesElement("min", columns, row_num);
}
void JSONRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t row_num)
{
writeExtremesElement("max", columns, row_num);
}
void JSONRowOutputFormat::writeAfterExtremes()
{
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
}
void JSONRowOutputFormat::finalizeImpl()
2019-02-19 18:41:18 +00:00
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows\": ", *ostr);
writeIntText(row_count, *ostr);
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
2021-11-19 13:45:10 +00:00
statistics = std::move(*outside_statistics);
2019-02-19 18:41:18 +00:00
writeRowsBeforeLimitAtLeast();
if (settings.write_statistics)
writeStatistics();
writeChar('\n', *ostr);
writeCString("}\n", *ostr);
ostr->next();
}
void JSONRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (statistics.applied_limit)
2019-02-19 18:41:18 +00:00
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows_before_limit_at_least\": ", *ostr);
writeIntText(statistics.rows_before_limit, *ostr);
2019-02-19 18:41:18 +00:00
}
}
void JSONRowOutputFormat::writeStatistics()
{
writeCString(",\n\n", *ostr);
writeCString("\t\"statistics\":\n", *ostr);
writeCString("\t{\n", *ostr);
writeCString("\t\t\"elapsed\": ", *ostr);
writeText(statistics.watch.elapsedSeconds(), *ostr);
2019-02-19 18:41:18 +00:00
writeCString(",\n", *ostr);
writeCString("\t\t\"rows_read\": ", *ostr);
writeText(statistics.progress.read_rows.load(), *ostr);
2019-02-19 18:41:18 +00:00
writeCString(",\n", *ostr);
writeCString("\t\t\"bytes_read\": ", *ostr);
writeText(statistics.progress.read_bytes.load(), *ostr);
2019-02-19 18:41:18 +00:00
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
}
void JSONRowOutputFormat::onProgress(const Progress & value)
{
statistics.progress.incrementPiecewiseAtomically(value);
2019-02-19 18:41:18 +00:00
}
2021-10-11 16:11:50 +00:00
void registerOutputFormatJSON(FormatFactory & factory)
2019-02-19 18:41:18 +00:00
{
2021-10-11 16:11:50 +00:00
factory.registerOutputFormat("JSON", [](
2019-02-19 18:41:18 +00:00
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
2019-02-19 18:41:18 +00:00
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
2020-09-02 04:05:02 +00:00
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
2022-01-14 15:16:18 +00:00
factory.markFormatDoesntSupportAppend("JSON");
2021-10-11 16:11:50 +00:00
factory.registerOutputFormat("JSONStrings", [](
2020-09-02 04:05:02 +00:00
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
2020-09-02 04:05:02 +00:00
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
2019-02-19 18:41:18 +00:00
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
2022-01-14 15:16:18 +00:00
factory.markFormatDoesntSupportAppend("JSONStrings");
2019-02-19 18:41:18 +00:00
}
}