Merge pull request #66205 from alexkorep/output-format-json-compact-with-progress

JSONCompactWithProgress query output format
This commit is contained in:
Raúl Marín 2024-09-09 09:30:42 +00:00 committed by GitHub
commit d968de6d26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 376 additions and 0 deletions

View File

@ -39,6 +39,7 @@ The supported formats are:
| [JSONCompact](#jsoncompact) | ✔ | ✔ |
| [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ |
| [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ |
| [JSONCompactWithProgress](#jsoncompactwithprogress) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [PrettyJSONEachRow](#prettyjsoneachrow) | ✗ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
@ -988,6 +989,59 @@ Example:
Columns that are not present in the block will be filled with default values (you can use [input_format_defaults_for_omitted_fields](/docs/en/operations/settings/settings-formats.md/#input_format_defaults_for_omitted_fields) setting here)
## JSONCompactWithProgress (#jsoncompactwithprogress)
In this format, ClickHouse outputs each row as a separated, newline-delimited JSON Object.
Each row is either a metadata object, data object, progress information or statistics object:
1. **Metadata Object (`meta`)**
- Describes the structure of the data rows.
- Fields: `name` (column name), `type` (data type, e.g., `UInt32`, `String`, etc.).
- Example: `{"meta": [{"name":"id", "type":"UInt32"}, {"name":"name", "type":"String"}]}`
- Appears before any data objects.
2. **Data Object (`data`)**
- Represents a row of query results.
- Fields: An array with values corresponding to the columns defined in the metadata.
- Example: `{"data":["1", "John Doe"]}`
- Appears after the metadata object, one per row.
3. **Progress Information Object (`progress`)**
- Provides real-time progress feedback during query execution.
- Fields: `read_rows`, `read_bytes`, `written_rows`, `written_bytes`, `total_rows_to_read`, `result_rows`, `result_bytes`, `elapsed_ns`.
- Example: `{"progress":{"read_rows":"8","read_bytes":"168"}}`
- May appear intermittently.
4. **Statistics Object (`statistics`)**
- Summarizes query execution statistics.
- Fields: `rows`, `rows_before_limit_at_least`, `elapsed`, `rows_read`, `bytes_read`.
- Example: `{"statistics": {"rows":2, "elapsed":0.001995, "rows_read":8}}`
- Appears at the end.
5. **Exception Object (`exception`)**
- Represents an error that occurred during query execution.
- Fields: A single text field containing the error message.
- Example: `{"exception": "Code: 395. DB::Exception: Value passed to 'throwIf' function is non-zero..."}`
- Appears when an error is encountered.
6. **Totals Object (`totals`)**
- Provides the totals for each numeric column in the result set.
- Fields: An array with total values corresponding to the columns defined in the metadata.
- Example: `{"totals": ["", "3"]}`
- Appears at the end of the data rows, if applicable.
Example:
```json
{"meta": [{"name":"id", "type":"UInt32"}, {"name":"name", "type":"String"}]}
{"progress":{"read_rows":"8","read_bytes":"168","written_rows":"0","written_bytes":"0","total_rows_to_read":"2","result_rows":"0","result_bytes":"0","elapsed_ns":"0"}}
{"data":["1", "John Doe"]}
{"data":["2", "Joe Doe"]}
{"statistics": {"rows":2, "rows_before_limit_at_least":8, "elapsed":0.001995, "rows_read":8, "bytes_read":168}}
```
## JSONEachRow {#jsoneachrow}
In this format, ClickHouse outputs each row as a separated, newline-delimited JSON Object.

View File

@ -483,6 +483,33 @@ namespace JSONUtils
writeArrayEnd(out, 1);
}
void writeCompactMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out)
{
writeCompactArrayStart(out, 0, "meta");
for (size_t i = 0; i < names.size(); ++i)
{
writeCompactObjectStart(out);
writeTitle("name", out, 0, "");
/// The field names are pre-escaped to be put into JSON string literal.
writeChar('"', out);
writeString(names[i], out);
writeChar('"', out);
writeFieldCompactDelimiter(out);
writeTitle("type", out, 0, "");
writeJSONString(types[i]->getName(), out, settings);
writeCompactObjectEnd(out);
if (i + 1 < names.size())
writeFieldCompactDelimiter(out);
}
writeCompactArrayEnd(out);
}
void writeAdditionalInfo(
size_t rows,
size_t rows_before_limit,
@ -530,6 +557,45 @@ namespace JSONUtils
}
}
void writeCompactAdditionalInfo(
size_t rows,
size_t rows_before_limit,
bool applied_limit,
const Stopwatch & watch,
const Progress & progress,
bool write_statistics,
WriteBuffer & out)
{
writeCompactObjectStart(out);
writeCompactObjectStart(out, 0, "statistics");
writeTitle("rows", out, 0, "");
writeIntText(rows, out);
if (applied_limit)
{
writeFieldCompactDelimiter(out);
writeTitle("rows_before_limit_at_least", out, 0, "");
writeIntText(rows_before_limit, out);
}
if (write_statistics)
{
writeFieldCompactDelimiter(out);
writeTitle("elapsed", out, 0, "");
writeText(watch.elapsedSeconds(), out);
writeFieldCompactDelimiter(out);
writeTitle("rows_read", out, 0, "");
writeText(progress.read_rows.load(), out);
writeFieldCompactDelimiter(out);
writeTitle("bytes_read", out, 0, "");
writeText(progress.read_bytes.load(), out);
}
writeCompactObjectEnd(out);
writeCompactObjectEnd(out);
}
void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent)
{
writeTitle("exception", out, indent, " ");

View File

@ -99,6 +99,7 @@ namespace JSONUtils
WriteBuffer & out);
void writeMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out);
void writeCompactMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out);
void writeAdditionalInfo(
size_t rows,
@ -111,6 +112,15 @@ namespace JSONUtils
bool write_statistics,
WriteBuffer & out);
void writeCompactAdditionalInfo(
size_t rows,
size_t rows_before_limit,
bool applied_limit,
const Stopwatch & watch,
const Progress & progress,
bool write_statistics,
WriteBuffer & out);
void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0);
void skipColon(ReadBuffer & in);

View File

@ -95,6 +95,7 @@ void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatPrometheus(FormatFactory & factory);
void registerOutputFormatSQLInsert(FormatFactory & factory);
void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory);
/// Input only formats.
@ -242,6 +243,7 @@ void registerFormats()
registerOutputFormatCapnProto(factory);
registerOutputFormatPrometheus(factory);
registerOutputFormatSQLInsert(factory);
registerOutputFormatJSONCompactWithProgress(factory);
registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);

View File

@ -0,0 +1,154 @@
#include <Formats/FormatFactory.h>
#include <Formats/JSONUtils.h>
#include <Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.h>
#include <IO/WriteHelpers.h>
namespace DB
{
JSONCompactWithProgressRowOutputFormat::JSONCompactWithProgressRowOutputFormat(
WriteBuffer & out_, const Block & header, const FormatSettings & settings_, bool yield_strings_)
: JSONRowOutputFormat(out_, header, settings_, yield_strings_)
{
}
void JSONCompactWithProgressRowOutputFormat::writePrefix()
{
JSONUtils::writeCompactObjectStart(*ostr);
JSONUtils::writeCompactMetadata(names, types, settings, *ostr);
JSONUtils::writeCompactObjectEnd(*ostr);
writeCString("\n", *ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
{
JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr);
++field_number;
}
void JSONCompactWithProgressRowOutputFormat::writeFieldDelimiter()
{
JSONUtils::writeFieldCompactDelimiter(*ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeRowStartDelimiter()
{
if (has_progress)
writeProgress();
writeCString("{\"data\":", *ostr);
JSONUtils::writeCompactArrayStart(*ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeRowEndDelimiter()
{
JSONUtils::writeCompactArrayEnd(*ostr);
writeCString("}\n", *ostr);
field_number = 0;
++row_count;
}
void JSONCompactWithProgressRowOutputFormat::writeRowBetweenDelimiter()
{
}
void JSONCompactWithProgressRowOutputFormat::writeBeforeTotals()
{
JSONUtils::writeCompactObjectStart(*ostr);
JSONUtils::writeCompactArrayStart(*ostr, 0, "totals");
}
void JSONCompactWithProgressRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
JSONUtils::writeCompactColumns(columns, serializations, row_num, yield_strings, settings, *ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeAfterTotals()
{
JSONUtils::writeCompactArrayEnd(*ostr);
JSONUtils::writeCompactObjectEnd(*ostr);
writeCString("\n", *ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
JSONUtils::writeCompactArrayStart(*ostr, 2, title);
JSONUtils::writeCompactColumns(columns, serializations, row_num, yield_strings, settings, *ostr);
JSONUtils::writeCompactArrayEnd(*ostr);
}
void JSONCompactWithProgressRowOutputFormat::onProgress(const Progress & value)
{
statistics.progress.incrementPiecewiseAtomically(value);
String progress_line;
WriteBufferFromString buf(progress_line);
writeCString("{\"progress\":", buf);
statistics.progress.writeJSON(buf);
writeCString("}\n", buf);
buf.finalize();
std::lock_guard lock(progress_lines_mutex);
progress_lines.emplace_back(std::move(progress_line));
has_progress = true;
}
void JSONCompactWithProgressRowOutputFormat::flush()
{
if (has_progress)
writeProgress();
JSONRowOutputFormat::flush();
}
void JSONCompactWithProgressRowOutputFormat::writeSuffix()
{
if (has_progress)
writeProgress();
}
void JSONCompactWithProgressRowOutputFormat::writeProgress()
{
std::lock_guard lock(progress_lines_mutex);
for (const auto & progress_line : progress_lines)
writeString(progress_line, *ostr);
progress_lines.clear();
has_progress = false;
}
void JSONCompactWithProgressRowOutputFormat::finalizeImpl()
{
if (exception_message.empty())
{
JSONUtils::writeCompactAdditionalInfo(
row_count,
statistics.rows_before_limit,
statistics.applied_limit,
statistics.watch,
statistics.progress,
settings.write_statistics,
*ostr);
}
else
{
JSONUtils::writeCompactObjectStart(*ostr);
JSONUtils::writeException(exception_message, *ostr, settings, 0);
JSONUtils::writeCompactObjectEnd(*ostr);
}
writeCString("\n", *ostr);
ostr->next();
}
void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory)
{
factory.registerOutputFormat(
"JSONCompactWithProgress",
[](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings)
{ return std::make_shared<JSONCompactWithProgressRowOutputFormat>(buf, sample, format_settings, false); });
factory.registerOutputFormat(
"JSONCompactWithProgressStrings",
[](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings)
{ return std::make_shared<JSONCompactWithProgressRowOutputFormat>(buf, sample, format_settings, true); });
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
namespace DB
{
struct FormatSettings;
class JSONCompactWithProgressRowOutputFormat final : public JSONRowOutputFormat
{
public:
JSONCompactWithProgressRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_, bool yield_strings_);
String getName() const override { return "JSONCompactWithProgressRowOutputFormat"; }
void onProgress(const Progress & value) override;
void flush() override;
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
bool supportTotals() const override { return true; }
bool supportExtremes() const override { return true; }
void writeBeforeTotals() override;
void writeAfterTotals() override;
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override;
void writeTotals(const Columns & columns, size_t row_num) override;
void writeProgress();
void writePrefix() override;
void writeSuffix() override;
void finalizeImpl() override;
std::vector<String> progress_lines;
std::mutex progress_lines_mutex;
/// To not lock mutex and check progress_lines every row,
/// we will use atomic flag that progress_lines is not empty.
std::atomic_bool has_progress = false;
};
}

View File

@ -0,0 +1,15 @@
1
{"meta": [{"name":"value", "type":"UInt8"}, {"name":"name", "type":"String"}]}
{"data":[1, "a"]}
{"data":[2, "b"]}
{"data":[3, "c"]}
{"statistics": {"rows":3, "elapsed":ELAPSED, "rows_read":3, "bytes_read":33}}
2
{"meta": [{"name":"name", "type":"String"}, {"name":"c", "type":"UInt64"}]}
{"data":["a", "1"]}
{"data":["b", "1"]}
{"data":["c", "1"]}
{"totals": ["", "3"]}
{"statistics": {"rows":3, "elapsed":ELAPSED, "rows_read":3, "bytes_read":30}}
3
Value passed to 'throwIf' function is non-zero:

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_table;"
$CLICKHOUSE_CLIENT -q "SELECT 1;"
# Check JSONCompactWithProgress Output
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value;"
$CLICKHOUSE_CLIENT -q "INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c');"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_table FORMAT JSONCompactWithProgress settings max_block_size=2;" | grep -v --text "progress" | sed -E 's/"elapsed":[0-9]+\.[0-9]+/"elapsed":ELAPSED/g'
$CLICKHOUSE_CLIENT -q "SELECT 2;"
# Check Totals
$CLICKHOUSE_CLIENT -q "SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactWithProgress settings max_block_size=2;" | grep -v --text "progress" | sed -E 's/"elapsed":[0-9]+\.[0-9]+/"elapsed":ELAPSED/g'
$CLICKHOUSE_CLIENT -q "SELECT 3;"
# Check exceptions
${CLICKHOUSE_CURL} -sS "$CLICKHOUSE_URL" -d "SELECT throwIf(number = 15), 1::Int64 as a, '\"' from numbers(100) format JSONCompactWithProgress settings output_format_json_quote_64bit_integers=1, max_block_size=10" | grep "exception" | cut -c42-88
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_table;"

View File

@ -421,6 +421,7 @@ JSONCompactStringsEachRowWithNames
JSONCompactStringsEachRowWithNamesAndTypes
JSONDynamicPaths
JSONDynamicPathsWithTypes
JSONCompactWithProgress
JSONEachRow
JSONEachRowWithProgress
JSONExtract
@ -1916,6 +1917,7 @@ jsoncompactstrings
jsoncompactstringseachrow
jsoncompactstringseachrowwithnames
jsoncompactstringseachrowwithnamesandtypes
jsoncompactwithprogress
jsoneachrow
jsoneachrowwithprogress
jsonobjecteachrow