Add JSONCompactWithProgressRowOutputFormat

This commit is contained in:
Alexey Korepanov 2024-06-22 09:30:16 +02:00
parent 31eeeae4fd
commit f9b70ea77a
3 changed files with 180 additions and 0 deletions

View File

@ -95,6 +95,7 @@ void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatPrometheus(FormatFactory & factory);
void registerOutputFormatSQLInsert(FormatFactory & factory);
void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory);
/// Input only formats.
@ -242,6 +243,7 @@ void registerFormats()
registerOutputFormatCapnProto(factory);
registerOutputFormatPrometheus(factory);
registerOutputFormatSQLInsert(factory);
registerOutputFormatJSONCompactWithProgress(factory);
registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);

View File

@ -0,0 +1,125 @@
#include <Processors/Formats/Impl/JSONCompactWithProgressRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/JSONUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>
namespace DB
{
JSONCompactWithProgressRowOutputFormat::JSONCompactWithProgressRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, settings_, yield_strings_)
{
}
void JSONCompactWithProgressRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row_num)
{
JSONUtils::writeFieldFromColumn(column, serialization, row_num, yield_strings, settings, *ostr);
++field_number;
LOG_DEBUG(getLogger("JSONCompactWithProgressRowOutputFormat"), "Field number: {}", field_number);
}
void JSONCompactWithProgressRowOutputFormat::writeFieldDelimiter()
{
JSONUtils::writeFieldCompactDelimiter(*ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeRowStartDelimiter()
{
if (has_progress)
writeProgress();
JSONUtils::writeCompactArrayStart(*ostr, 2);
}
void JSONCompactWithProgressRowOutputFormat::writeRowEndDelimiter()
{
JSONUtils::writeCompactArrayEnd(*ostr);
field_number = 0;
++row_count;
}
void JSONCompactWithProgressRowOutputFormat::writeBeforeTotals()
{
JSONUtils::writeFieldDelimiter(*ostr, 2);
JSONUtils::writeCompactArrayStart(*ostr, 1, "totals");
}
void JSONCompactWithProgressRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
JSONUtils::writeCompactColumns(columns, serializations, row_num, yield_strings, settings, *ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeAfterTotals()
{
JSONUtils::writeCompactArrayEnd(*ostr);
}
void JSONCompactWithProgressRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
JSONUtils::writeCompactArrayStart(*ostr, 2, title);
JSONUtils::writeCompactColumns(columns, serializations, row_num, yield_strings, settings, *ostr);
JSONUtils::writeCompactArrayEnd(*ostr);
}
void JSONCompactWithProgressRowOutputFormat::onProgress(const Progress & value)
{
LOG_DEBUG(getLogger("JSONCompactWithProgressRowOutputFormat"), "onProgress: {}", value.read_rows);
progress.incrementPiecewiseAtomically(value);
String progress_line;
WriteBufferFromString buf(progress_line);
writeCString("{\"progress\":", buf);
progress.writeJSON(buf);
writeCString("}\n", buf);
buf.finalize();
std::lock_guard lock(progress_lines_mutex);
progress_lines.emplace_back(std::move(progress_line));
has_progress = true;
}
void JSONCompactWithProgressRowOutputFormat::flush()
{
if (has_progress)
writeProgress();
JSONRowOutputFormat::flush();
}
void JSONCompactWithProgressRowOutputFormat::writeSuffix()
{
if (has_progress)
writeProgress();
JSONRowOutputFormat::writeSuffix();
}
void JSONCompactWithProgressRowOutputFormat::writeProgress()
{
std::lock_guard lock(progress_lines_mutex);
for (const auto & progress_line : progress_lines)
writeString(progress_line, *ostr);
progress_lines.clear();
has_progress = false;
}
void registerOutputFormatJSONCompactWithProgress(FormatFactory & factory)
{
factory.registerOutputFormat("JSONCompactWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactWithProgressRowOutputFormat>(buf, sample, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactWithProgress");
}
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
namespace DB
{
struct FormatSettings;
/** The stream for outputting data in the JSONCompact- formats.
*/
class JSONCompactWithProgressRowOutputFormat final : public JSONRowOutputFormat
{
public:
JSONCompactWithProgressRowOutputFormat(
WriteBuffer & out_,
const Block & header,
const FormatSettings & settings_,
bool yield_strings_);
String getName() const override { return "JSONCompactWithProgressRowOutputFormat"; }
void onProgress(const Progress & value) override;
void flush() override;
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
bool supportTotals() const override { return true; }
bool supportExtremes() const override { return true; }
void writeBeforeTotals() override;
void writeAfterTotals() override;
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override;
void writeTotals(const Columns & columns, size_t row_num) override;
void writeProgress();
void writeSuffix() override;
Progress progress;
std::vector<String> progress_lines;
std::mutex progress_lines_mutex;
/// To not lock mutex and check progress_lines every row,
/// we will use atomic flag that progress_lines is not empty.
std::atomic_bool has_progress = false;
};
}