diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index 8892c6d8d3f..df96b8129f1 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -39,6 +39,7 @@ The supported formats are: | [JSONCompact](#jsoncompact) | ✔ | ✔ | | [JSONCompactStrings](#jsoncompactstrings) | ✗ | ✔ | | [JSONCompactColumns](#jsoncompactcolumns) | ✔ | ✔ | +| [JSONCompactWithProgress](#jsoncompactwithprogress) | ✗ | ✔ | | [JSONEachRow](#jsoneachrow) | ✔ | ✔ | | [PrettyJSONEachRow](#prettyjsoneachrow) | ✗ | ✔ | | [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ | @@ -988,6 +989,59 @@ Example: Columns that are not present in the block will be filled with default values (you can use [input_format_defaults_for_omitted_fields](/docs/en/operations/settings/settings-formats.md/#input_format_defaults_for_omitted_fields) setting here) +## JSONCompactWithProgress (#jsoncompactwithprogress) + +In this format, ClickHouse outputs each row as a separated, newline-delimited JSON Object. + +Each row is either a metadata object, data object, progress information or statistics object: + +1. **Metadata Object (`meta`)** + - Describes the structure of the data rows. + - Fields: `name` (column name), `type` (data type, e.g., `UInt32`, `String`, etc.). + - Example: `{"meta": [{"name":"id", "type":"UInt32"}, {"name":"name", "type":"String"}]}` + - Appears before any data objects. + +2. **Data Object (`data`)** + - Represents a row of query results. + - Fields: An array with values corresponding to the columns defined in the metadata. + - Example: `{"data":["1", "John Doe"]}` + - Appears after the metadata object, one per row. + +3. **Progress Information Object (`progress`)** + - Provides real-time progress feedback during query execution. + - Fields: `read_rows`, `read_bytes`, `written_rows`, `written_bytes`, `total_rows_to_read`, `result_rows`, `result_bytes`, `elapsed_ns`. + - Example: `{"progress":{"read_rows":"8","read_bytes":"168"}}` + - May appear intermittently. + +4. **Statistics Object (`statistics`)** + - Summarizes query execution statistics. + - Fields: `rows`, `rows_before_limit_at_least`, `elapsed`, `rows_read`, `bytes_read`. + - Example: `{"statistics": {"rows":2, "elapsed":0.001995, "rows_read":8}}` + - Appears at the end. + +5. **Exception Object (`exception`)** + - Represents an error that occurred during query execution. + - Fields: A single text field containing the error message. + - Example: `{"exception": "Code: 395. DB::Exception: Value passed to 'throwIf' function is non-zero..."}` + - Appears when an error is encountered. + +6. **Totals Object (`totals`)** + - Provides the totals for each numeric column in the result set. + - Fields: An array with total values corresponding to the columns defined in the metadata. + - Example: `{"totals": ["", "3"]}` + - Appears at the end of the data rows, if applicable. + +Example: + +```json +{"meta": [{"name":"id", "type":"UInt32"}, {"name":"name", "type":"String"}]} +{"progress":{"read_rows":"8","read_bytes":"168","written_rows":"0","written_bytes":"0","total_rows_to_read":"2","result_rows":"0","result_bytes":"0","elapsed_ns":"0"}} +{"data":["1", "John Doe"]} +{"data":["2", "Joe Doe"]} +{"statistics": {"rows":2, "rows_before_limit_at_least":8, "elapsed":0.001995, "rows_read":8, "bytes_read":168}} +``` + + ## JSONEachRow {#jsoneachrow} In this format, ClickHouse outputs each row as a separated, newline-delimited JSON Object. diff --git a/src/Formats/JSONUtils.cpp b/src/Formats/JSONUtils.cpp index 9d898cd2470..123f2e4f608 100644 --- a/src/Formats/JSONUtils.cpp +++ b/src/Formats/JSONUtils.cpp @@ -483,6 +483,33 @@ namespace JSONUtils writeArrayEnd(out, 1); } + + void writeCompactMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out) + { + writeCompactArrayStart(out, 0, "meta"); + + for (size_t i = 0; i < names.size(); ++i) + { + writeCompactObjectStart(out); + writeTitle("name", out, 0, ""); + + /// The field names are pre-escaped to be put into JSON string literal. + writeChar('"', out); + writeString(names[i], out); + writeChar('"', out); + + writeFieldCompactDelimiter(out); + writeTitle("type", out, 0, ""); + writeJSONString(types[i]->getName(), out, settings); + writeCompactObjectEnd(out); + + if (i + 1 < names.size()) + writeFieldCompactDelimiter(out); + } + + writeCompactArrayEnd(out); + } + void writeAdditionalInfo( size_t rows, size_t rows_before_limit, @@ -530,6 +557,45 @@ namespace JSONUtils } } + void writeCompactAdditionalInfo( + size_t rows, + size_t rows_before_limit, + bool applied_limit, + const Stopwatch & watch, + const Progress & progress, + bool write_statistics, + WriteBuffer & out) + { + writeCompactObjectStart(out); + writeCompactObjectStart(out, 0, "statistics"); + writeTitle("rows", out, 0, ""); + writeIntText(rows, out); + + if (applied_limit) + { + writeFieldCompactDelimiter(out); + writeTitle("rows_before_limit_at_least", out, 0, ""); + writeIntText(rows_before_limit, out); + } + + if (write_statistics) + { + writeFieldCompactDelimiter(out); + writeTitle("elapsed", out, 0, ""); + writeText(watch.elapsedSeconds(), out); + writeFieldCompactDelimiter(out); + + writeTitle("rows_read", out, 0, ""); + writeText(progress.read_rows.load(), out); + writeFieldCompactDelimiter(out); + + writeTitle("bytes_read", out, 0, ""); + writeText(progress.read_bytes.load(), out); + } + writeCompactObjectEnd(out); + writeCompactObjectEnd(out); + } + void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent) { writeTitle("exception", out, indent, " "); diff --git a/src/Formats/JSONUtils.h b/src/Formats/JSONUtils.h index e2ac3467971..622703947b9 100644 --- a/src/Formats/JSONUtils.h +++ b/src/Formats/JSONUtils.h @@ -99,6 +99,7 @@ namespace JSONUtils WriteBuffer & out); void writeMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out); + void writeCompactMetadata(const Names & names, const DataTypes & types, const FormatSettings & settings, WriteBuffer & out); void writeAdditionalInfo( size_t rows, @@ -111,6 +112,15 @@ namespace JSONUtils bool write_statistics, WriteBuffer & out); + void writeCompactAdditionalInfo( + size_t rows, + size_t rows_before_limit, + bool applied_limit, + const Stopwatch & watch, + const Progress & progress, + bool write_statistics, + WriteBuffer & out); + void writeException(const String & exception_message, WriteBuffer & out, const FormatSettings & settings, size_t indent = 0); void skipColon(ReadBuffer & in); diff --git a/src/Formats/registerFormats.cpp b/src/Formats/registerFormats.cpp index 57ca1bb49c8..770b747fafd 100644 --- a/src/Formats/registerFormats.cpp +++ b/src/Formats/registerFormats.cpp @@ -95,6 +95,7 @@ void registerOutputFormatMarkdown(FormatFactory & factory); void registerOutputFormatPostgreSQLWire(FormatFactory & factory); void registerOutputFormatPrometheus(FormatFactory & factory); void registerOutputFormatSQLInsert(FormatFactory & factory); +void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory); /// Input only formats. @@ -242,6 +243,7 @@ void registerFormats() registerOutputFormatCapnProto(factory); registerOutputFormatPrometheus(factory); registerOutputFormatSQLInsert(factory); + registerOutputFormatJSONCompactWithProgress(factory); registerInputFormatRegexp(factory); registerInputFormatJSONAsString(factory); diff --git a/src/Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.cpp new file mode 100644 index 00000000000..e90864ecdf3 --- /dev/null +++ b/src/Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.cpp @@ -0,0 +1,154 @@ +#include +#include +#include + +#include + + +namespace DB +{ + +JSONCompactWithProgressRowOutputFormat::JSONCompactWithProgressRowOutputFormat( + WriteBuffer & out_, const Block & header, const FormatSettings & settings_, bool yield_strings_) + : JSONRowOutputFormat(out_, header, settings_, yield_strings_) +{ +} + +void JSONCompactWithProgressRowOutputFormat::writePrefix() +{ + JSONUtils::writeCompactObjectStart(*ostr); + JSONUtils::writeCompactMetadata(names, types, settings, *ostr); + JSONUtils::writeCompactObjectEnd(*ostr); + writeCString("\n", *ostr); +} + +void JSONCompactWithProgressRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) +{ + JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr); + ++field_number; +} + +void JSONCompactWithProgressRowOutputFormat::writeFieldDelimiter() +{ + JSONUtils::writeFieldCompactDelimiter(*ostr); +} + +void JSONCompactWithProgressRowOutputFormat::writeRowStartDelimiter() +{ + if (has_progress) + writeProgress(); + writeCString("{\"data\":", *ostr); + JSONUtils::writeCompactArrayStart(*ostr); +} + +void JSONCompactWithProgressRowOutputFormat::writeRowEndDelimiter() +{ + JSONUtils::writeCompactArrayEnd(*ostr); + writeCString("}\n", *ostr); + field_number = 0; + ++row_count; +} + +void JSONCompactWithProgressRowOutputFormat::writeRowBetweenDelimiter() +{ +} + +void JSONCompactWithProgressRowOutputFormat::writeBeforeTotals() +{ + JSONUtils::writeCompactObjectStart(*ostr); + JSONUtils::writeCompactArrayStart(*ostr, 0, "totals"); +} + +void JSONCompactWithProgressRowOutputFormat::writeTotals(const Columns & columns, size_t row_num) +{ + JSONUtils::writeCompactColumns(columns, serializations, row_num, yield_strings, settings, *ostr); +} + +void JSONCompactWithProgressRowOutputFormat::writeAfterTotals() +{ + JSONUtils::writeCompactArrayEnd(*ostr); + JSONUtils::writeCompactObjectEnd(*ostr); + writeCString("\n", *ostr); +} + +void JSONCompactWithProgressRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num) +{ + JSONUtils::writeCompactArrayStart(*ostr, 2, title); + JSONUtils::writeCompactColumns(columns, serializations, row_num, yield_strings, settings, *ostr); + JSONUtils::writeCompactArrayEnd(*ostr); +} + +void JSONCompactWithProgressRowOutputFormat::onProgress(const Progress & value) +{ + statistics.progress.incrementPiecewiseAtomically(value); + String progress_line; + WriteBufferFromString buf(progress_line); + writeCString("{\"progress\":", buf); + statistics.progress.writeJSON(buf); + writeCString("}\n", buf); + buf.finalize(); + std::lock_guard lock(progress_lines_mutex); + progress_lines.emplace_back(std::move(progress_line)); + has_progress = true; +} + + +void JSONCompactWithProgressRowOutputFormat::flush() +{ + if (has_progress) + writeProgress(); + JSONRowOutputFormat::flush(); +} + +void JSONCompactWithProgressRowOutputFormat::writeSuffix() +{ + if (has_progress) + writeProgress(); +} + +void JSONCompactWithProgressRowOutputFormat::writeProgress() +{ + std::lock_guard lock(progress_lines_mutex); + for (const auto & progress_line : progress_lines) + writeString(progress_line, *ostr); + progress_lines.clear(); + has_progress = false; +} + +void JSONCompactWithProgressRowOutputFormat::finalizeImpl() +{ + if (exception_message.empty()) + { + JSONUtils::writeCompactAdditionalInfo( + row_count, + statistics.rows_before_limit, + statistics.applied_limit, + statistics.watch, + statistics.progress, + settings.write_statistics, + *ostr); + } + else + { + JSONUtils::writeCompactObjectStart(*ostr); + JSONUtils::writeException(exception_message, *ostr, settings, 0); + JSONUtils::writeCompactObjectEnd(*ostr); + } + writeCString("\n", *ostr); + ostr->next(); +} + +void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory) +{ + factory.registerOutputFormat( + "JSONCompactWithProgress", + [](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings) + { return std::make_shared(buf, sample, format_settings, false); }); + + factory.registerOutputFormat( + "JSONCompactWithProgressStrings", + [](WriteBuffer & buf, const Block & sample, const FormatSettings & format_settings) + { return std::make_shared(buf, sample, format_settings, true); }); +} + +} diff --git a/src/Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.h b/src/Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.h new file mode 100644 index 00000000000..1c21914d8cb --- /dev/null +++ b/src/Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +struct FormatSettings; + +class JSONCompactWithProgressRowOutputFormat final : public JSONRowOutputFormat +{ +public: + JSONCompactWithProgressRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_, bool yield_strings_); + + String getName() const override { return "JSONCompactWithProgressRowOutputFormat"; } + + void onProgress(const Progress & value) override; + void flush() override; + +private: + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; + void writeFieldDelimiter() override; + void writeRowStartDelimiter() override; + void writeRowEndDelimiter() override; + void writeRowBetweenDelimiter() override; + bool supportTotals() const override { return true; } + bool supportExtremes() const override { return true; } + void writeBeforeTotals() override; + void writeAfterTotals() override; + void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override; + void writeTotals(const Columns & columns, size_t row_num) override; + + void writeProgress(); + void writePrefix() override; + void writeSuffix() override; + void finalizeImpl() override; + + + std::vector progress_lines; + std::mutex progress_lines_mutex; + /// To not lock mutex and check progress_lines every row, + /// we will use atomic flag that progress_lines is not empty. + std::atomic_bool has_progress = false; +}; + +} diff --git a/tests/queries/0_stateless/03174_json_compact_with_progress.reference b/tests/queries/0_stateless/03174_json_compact_with_progress.reference new file mode 100644 index 00000000000..cdbe7cfcb3e --- /dev/null +++ b/tests/queries/0_stateless/03174_json_compact_with_progress.reference @@ -0,0 +1,15 @@ +1 +{"meta": [{"name":"value", "type":"UInt8"}, {"name":"name", "type":"String"}]} +{"data":[1, "a"]} +{"data":[2, "b"]} +{"data":[3, "c"]} +{"statistics": {"rows":3, "elapsed":ELAPSED, "rows_read":3, "bytes_read":33}} +2 +{"meta": [{"name":"name", "type":"String"}, {"name":"c", "type":"UInt64"}]} +{"data":["a", "1"]} +{"data":["b", "1"]} +{"data":["c", "1"]} +{"totals": ["", "3"]} +{"statistics": {"rows":3, "elapsed":ELAPSED, "rows_read":3, "bytes_read":30}} +3 +Value passed to 'throwIf' function is non-zero: diff --git a/tests/queries/0_stateless/03174_json_compact_with_progress.sh b/tests/queries/0_stateless/03174_json_compact_with_progress.sh new file mode 100755 index 00000000000..b15dc7cfdb2 --- /dev/null +++ b/tests/queries/0_stateless/03174_json_compact_with_progress.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_table;" + +$CLICKHOUSE_CLIENT -q "SELECT 1;" +# Check JSONCompactWithProgress Output +$CLICKHOUSE_CLIENT -q "CREATE TABLE test_table (value UInt8, name String) ENGINE = MergeTree() ORDER BY value;" +$CLICKHOUSE_CLIENT -q "INSERT INTO test_table VALUES (1, 'a'), (2, 'b'), (3, 'c');" +$CLICKHOUSE_CLIENT -q "SELECT * FROM test_table FORMAT JSONCompactWithProgress settings max_block_size=2;" | grep -v --text "progress" | sed -E 's/"elapsed":[0-9]+\.[0-9]+/"elapsed":ELAPSED/g' + +$CLICKHOUSE_CLIENT -q "SELECT 2;" +# Check Totals +$CLICKHOUSE_CLIENT -q "SELECT name, count() AS c FROM test_table GROUP BY name WITH TOTALS ORDER BY name FORMAT JSONCompactWithProgress settings max_block_size=2;" | grep -v --text "progress" | sed -E 's/"elapsed":[0-9]+\.[0-9]+/"elapsed":ELAPSED/g' + +$CLICKHOUSE_CLIENT -q "SELECT 3;" +# Check exceptions +${CLICKHOUSE_CURL} -sS "$CLICKHOUSE_URL" -d "SELECT throwIf(number = 15), 1::Int64 as a, '\"' from numbers(100) format JSONCompactWithProgress settings output_format_json_quote_64bit_integers=1, max_block_size=10" | grep "exception" | cut -c42-88 + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_table;" diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index 0e08a5f8540..8c39a3d5994 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -421,6 +421,7 @@ JSONCompactStringsEachRowWithNames JSONCompactStringsEachRowWithNamesAndTypes JSONDynamicPaths JSONDynamicPathsWithTypes +JSONCompactWithProgress JSONEachRow JSONEachRowWithProgress JSONExtract @@ -1916,6 +1917,7 @@ jsoncompactstrings jsoncompactstringseachrow jsoncompactstringseachrowwithnames jsoncompactstringseachrowwithnamesandtypes +jsoncompactwithprogress jsoneachrow jsoneachrowwithprogress jsonobjecteachrow