Update JSONCompactWithProgressRowOutputFormat to print JSON on each row

This commit is contained in:
Alexey Korepanov 2024-07-07 21:12:26 +02:00
parent e1c60c4e40
commit 88736a0d74
4 changed files with 115 additions and 2 deletions

View File

@ -483,6 +483,33 @@ namespace JSONUtils
writeArrayEnd(out, 1); writeArrayEnd(out, 1);
} }
void writeCompactMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out)
{
writeCompactArrayStart(out, 0, "meta");
for (size_t i = 0; i < names.size(); ++i)
{
writeCompactObjectStart(out);
writeTitle("name", out, 0, "");
/// The field names are pre-escaped to be put into JSON string literal.
writeChar('"', out);
writeString(names[i], out);
writeChar('"', out);
writeFieldCompactDelimiter(out);
writeTitle("type", out, 0, "");
writeJSONString(types[i]->getName(), out, settings);
writeCompactObjectEnd(out);
if (i + 1 < names.size())
writeFieldCompactDelimiter(out);
}
writeCompactArrayEnd(out);
}
void writeAdditionalInfo( void writeAdditionalInfo(
size_t rows, size_t rows,
size_t rows_before_limit, size_t rows_before_limit,
@ -523,6 +550,43 @@ namespace JSONUtils
} }
} }
void writeCompactAdditionalInfo(
size_t rows,
size_t rows_before_limit,
bool applied_limit,
const Stopwatch & watch,
const Progress & progress,
bool write_statistics,
WriteBuffer & out)
{
writeCompactObjectStart(out, 0, "statistics");
writeTitle("rows", out, 0, "");
writeIntText(rows, out);
writeFieldCompactDelimiter(out);
if (applied_limit)
{
writeTitle("rows_before_limit_at_least", out, 0, "");
writeIntText(rows_before_limit, out);
writeFieldCompactDelimiter(out);
}
if (write_statistics)
{
writeTitle("elapsed", out, 0, "");
writeText(watch.elapsedSeconds(), out);
writeFieldCompactDelimiter(out);
writeTitle("rows_read", out, 0, "");
writeText(progress.read_rows.load(), out);
writeFieldCompactDelimiter(out);
writeTitle("bytes_read", out, 0, "");
writeText(progress.read_bytes.load(), out);
}
writeCompactObjectEnd(out);
}
void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent) void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent)
{ {
writeTitle("exception", out, indent, " "); writeTitle("exception", out, indent, " ");

View File

@ -99,6 +99,7 @@ namespace JSONUtils
WriteBuffer & out); WriteBuffer & out);
void writeMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out); void writeMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out);
void writeCompactMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out);
void writeAdditionalInfo( void writeAdditionalInfo(
size_t rows, size_t rows,
@ -109,6 +110,15 @@ namespace JSONUtils
bool write_statistics, bool write_statistics,
WriteBuffer & out); WriteBuffer & out);
void writeCompactAdditionalInfo(
size_t rows,
size_t rows_before_limit,
bool applied_limit,
const Stopwatch & watch,
const Progress & progress,
bool write_statistics,
WriteBuffer & out);
void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0); void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0);
void skipColon(ReadBuffer & in); void skipColon(ReadBuffer & in);

View File

@ -17,6 +17,14 @@ JSONCompactWithProgressRowOutputFormat::JSONCompactWithProgressRowOutputFormat(
{ {
} }
void JSONCompactWithProgressRowOutputFormat::writePrefix()
{
JSONUtils::writeCompactObjectStart(*ostr);
JSONUtils::writeCompactMetadata(names, types, settings, *ostr);
JSONUtils::writeCompactObjectEnd(*ostr);
writeCString("}\n", *ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) void JSONCompactWithProgressRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
{ {
JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr); JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr);
@ -32,16 +40,22 @@ void JSONCompactWithProgressRowOutputFormat::writeRowStartDelimiter()
{ {
if (has_progress) if (has_progress)
writeProgress(); writeProgress();
JSONUtils::writeCompactArrayStart(*ostr, 2); writeCString("{\"data\":", *ostr);
JSONUtils::writeCompactArrayStart(*ostr);
} }
void JSONCompactWithProgressRowOutputFormat::writeRowEndDelimiter() void JSONCompactWithProgressRowOutputFormat::writeRowEndDelimiter()
{ {
JSONUtils::writeCompactArrayEnd(*ostr); JSONUtils::writeCompactArrayEnd(*ostr);
writeCString("}\n", *ostr);
field_number = 0; field_number = 0;
++row_count; ++row_count;
} }
void JSONCompactWithProgressRowOutputFormat::writeRowBetweenDelimiter()
{
}
void JSONCompactWithProgressRowOutputFormat::writeBeforeTotals() void JSONCompactWithProgressRowOutputFormat::writeBeforeTotals()
{ {
JSONUtils::writeFieldDelimiter(*ostr, 2); JSONUtils::writeFieldDelimiter(*ostr, 2);
@ -91,7 +105,6 @@ void JSONCompactWithProgressRowOutputFormat::writeSuffix()
{ {
if (has_progress) if (has_progress)
writeProgress(); writeProgress();
JSONRowOutputFormat::writeSuffix();
} }
void JSONCompactWithProgressRowOutputFormat::writeProgress() void JSONCompactWithProgressRowOutputFormat::writeProgress()
@ -103,6 +116,28 @@ void JSONCompactWithProgressRowOutputFormat::writeProgress()
has_progress = false; has_progress = false;
} }
void JSONCompactWithProgressRowOutputFormat::finalizeImpl()
{
JSONUtils::writeCompactAdditionalInfo(
row_count,
statistics.rows_before_limit,
statistics.applied_limit,
statistics.watch,
statistics.progress,
settings.write_statistics && exception_message.empty(),
*ostr);
exception_message = "Test exception message.";
if (!exception_message.empty())
{
writeCString("\n", *ostr);
JSONUtils::writeCompactObjectStart(*ostr);
JSONUtils::writeException(exception_message, *ostr, settings, 0);
JSONUtils::writeCompactObjectEnd(*ostr);
}
ostr->next();
}
void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory) void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory)
{ {
factory.registerOutputFormat("JSONCompactWithProgress", []( factory.registerOutputFormat("JSONCompactWithProgress", [](

View File

@ -32,6 +32,7 @@ private:
void writeFieldDelimiter() override; void writeFieldDelimiter() override;
void writeRowStartDelimiter() override; void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override; void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; } bool supportTotals() const override { return true; }
bool supportExtremes() const override { return true; } bool supportExtremes() const override { return true; }
void writeBeforeTotals() override; void writeBeforeTotals() override;
@ -40,7 +41,10 @@ private:
void writeTotals(const Columns & columns, size_t row_num) override; void writeTotals(const Columns & columns, size_t row_num) override;
void writeProgress(); void writeProgress();
void writePrefix() override;
void writeSuffix() override; void writeSuffix() override;
void finalizeImpl() override;
Progress progress; Progress progress;
std::vector<String> progress_lines; std::vector<String> progress_lines;