Support parallel formatting for all text output formats

This commit is contained in:
avogar 2021-11-17 23:51:46 +03:00 committed by avogar
parent 8216b7863b
commit a900a26691
25 changed files with 217 additions and 104 deletions

View File

@ -141,6 +141,21 @@ Progress & Progress::operator=(Progress && other)
return *this;
}
Progress & Progress::operator=(const Progress & other)
{
read_rows = other.read_rows.load(std::memory_order_relaxed);
read_bytes = other.read_bytes.load(std::memory_order_relaxed);
read_raw_bytes = other.read_raw_bytes.load(std::memory_order_relaxed);
total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
total_raw_bytes_to_read = other.total_raw_bytes_to_read.load(std::memory_order_relaxed);
written_rows = other.written_rows.load(std::memory_order_relaxed);
written_bytes = other.written_bytes.load(std::memory_order_relaxed);
return *this;
}
void Progress::read(ReadBuffer & in, UInt64 server_revision)
{
ProgressValues values;

View File

@ -117,6 +117,13 @@ struct Progress
{
*this = std::move(other);
}
Progress & operator=(const Progress & other);
Progress(const Progress & other)
{
*this = other;
}
};

View File

@ -72,7 +72,7 @@ void IOutputFormat::work()
if (rows_before_limit_counter && rows_before_limit_counter->hasAppliedLimit())
setRowsBeforeLimit(rows_before_limit_counter->get());
finalizeImpl();
finalize();
finalized = true;
return;
}
@ -85,10 +85,15 @@ void IOutputFormat::work()
consume(std::move(current_chunk));
break;
case Totals:
writeSuffixIfNot();
if (auto totals = prepareTotals(std::move(current_chunk)))
{
consumeTotals(std::move(totals));
are_totals_written = true;
}
break;
case Extremes:
writeSuffixIfNot();
consumeExtremes(std::move(current_chunk));
break;
}
@ -116,6 +121,7 @@ void IOutputFormat::write(const Block & block)
void IOutputFormat::finalize()
{
writePrefixIfNot();
writeSuffixIfNot();
finalizeImpl();
}

View File

@ -4,7 +4,8 @@
#include <Processors/IProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <IO/Progress.h>
#include <Common/Stopwatch.h>
#include <iostream>
namespace DB
{
@ -59,14 +60,43 @@ public:
virtual bool expectMaterializedColumns() const { return true; }
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
void setTotals(const Block & totals)
{
writeSuffixIfNot();
consumeTotals(Chunk(totals.getColumns(), totals.rows()));
are_totals_written = true;
}
void setExtremes(const Block & extremes)
{
writeSuffixIfNot();
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }
void doNotWritePrefix() { need_write_prefix = false; }
void setFirstRowNumber(size_t first_row_number_)
{
first_row_number = first_row_number_;
onFirstRowNumberUpdate();
}
struct Statistics
{
Stopwatch watch;
Progress progress;
bool applied_limit = false;
size_t rows_before_limit = 0;
};
void setOutsideStatistics(const Statistics & statistics) { outside_statistics = statistics; }
void setTotalsAreWritten() { are_totals_written = true; }
bool areTotalsWritten() const { return are_totals_written; }
protected:
friend class ParallelFormattingOutputFormat;
@ -75,6 +105,12 @@ protected:
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
virtual void onFirstRowNumberUpdate() {}
size_t getFirstRowNumber() const { return first_row_number; }
std::optional<Statistics> getOutsideStatistics() const { return outside_statistics; }
void writePrefixIfNot()
{
@ -85,6 +121,15 @@ protected:
}
}
void writeSuffixIfNot()
{
if (need_write_suffix)
{
writeSuffix();
need_write_suffix = false;
}
}
WriteBuffer & out;
Chunk current_chunk;
@ -95,11 +140,17 @@ protected:
/// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready.
bool auto_flush = false;
bool need_write_prefix = true;
bool need_write_suffix = true;
RowsBeforeLimitCounterPtr rows_before_limit_counter;
private:
size_t first_row_number = 0;
std::optional<Statistics> outside_statistics = std::nullopt;
bool are_totals_written = false;
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;

View File

@ -27,7 +27,7 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
for (size_t row = 0; row < num_rows; ++row)
{
if (!first_row)
if (!first_row || getFirstRowNumber() != 0)
writeRowBetweenDelimiter();
write(columns, row);
@ -41,8 +41,6 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
void IRowOutputFormat::consumeTotals(DB::Chunk chunk)
{
writeSuffixIfNot();
auto num_rows = chunk.getNumRows();
if (num_rows != 1)
throw Exception("Got " + toString(num_rows) + " in totals chunk, expected 1", ErrorCodes::LOGICAL_ERROR);
@ -56,8 +54,6 @@ void IRowOutputFormat::consumeTotals(DB::Chunk chunk)
void IRowOutputFormat::consumeExtremes(DB::Chunk chunk)
{
writeSuffixIfNot();
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
if (num_rows != 2)
@ -70,12 +66,6 @@ void IRowOutputFormat::consumeExtremes(DB::Chunk chunk)
writeAfterExtremes();
}
void IRowOutputFormat::finalizeImpl()
{
writeSuffixIfNot();
writeLastSuffix();
}
void IRowOutputFormat::write(const Columns & columns, size_t row_num)
{
size_t num_columns = columns.size();

View File

@ -31,7 +31,6 @@ protected:
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
void finalizeImpl() override;
/** Write a row.
* Default implementation calls methods to write single values and delimiters
@ -51,29 +50,18 @@ protected:
virtual void writeRowEndDelimiter() {} /// delimiter after each row
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
virtual void writePrefix() override {} /// delimiter before resultset
virtual void writeSuffix() {} /// delimiter after resultset
virtual void writeSuffix() override {} /// delimiter after resultset
virtual void writeBeforeTotals() {}
virtual void writeAfterTotals() {}
virtual void writeBeforeExtremes() {}
virtual void writeAfterExtremes() {}
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
virtual void finalizeImpl() override {} /// Write something after resultset, totals end extremes.
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
private:
void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
bool suffix_written = false;
};
}

View File

@ -60,6 +60,7 @@ void registerOutputFormatRowBinary(FormatFactory & factory)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, params);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
registerWithNamesAndTypes("RowBinary", register_func);

View File

@ -42,6 +42,8 @@ void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONEachRowWithProgress");
factory.registerOutputFormat("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
@ -53,6 +55,8 @@ void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStringsEachRowWithProgress");
}
}

View File

@ -217,12 +217,16 @@ void JSONRowOutputFormat::writeAfterExtremes()
writeCString("\t}", *ostr);
}
void JSONRowOutputFormat::writeLastSuffix()
void JSONRowOutputFormat::finalizeImpl()
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows\": ", *ostr);
writeIntText(row_count, *ostr);
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
statistics = outside_statistics.value();
writeRowsBeforeLimitAtLeast();
if (settings.write_statistics)
@ -235,11 +239,11 @@ void JSONRowOutputFormat::writeLastSuffix()
void JSONRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (applied_limit)
if (statistics.applied_limit)
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows_before_limit_at_least\": ", *ostr);
writeIntText(rows_before_limit, *ostr);
writeIntText(statistics.rows_before_limit, *ostr);
}
}
@ -250,13 +254,13 @@ void JSONRowOutputFormat::writeStatistics()
writeCString("\t{\n", *ostr);
writeCString("\t\t\"elapsed\": ", *ostr);
writeText(watch.elapsedSeconds(), *ostr);
writeText(statistics.watch.elapsedSeconds(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"rows_read\": ", *ostr);
writeText(progress.read_rows.load(), *ostr);
writeText(statistics.progress.read_rows.load(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"bytes_read\": ", *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeText(statistics.progress.read_bytes.load(), *ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
@ -264,7 +268,7 @@ void JSONRowOutputFormat::writeStatistics()
void JSONRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
statistics.progress.incrementPiecewiseAtomically(value);
}
@ -279,6 +283,8 @@ void registerOutputFormatJSON(FormatFactory & factory)
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
@ -287,6 +293,8 @@ void registerOutputFormatJSON(FormatFactory & factory)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
}
}

View File

@ -39,8 +39,8 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
statistics.applied_limit = true;
statistics.rows_before_limit = rows_before_limit_;
}
protected:
@ -61,7 +61,7 @@ protected:
void writeBeforeExtremes() override;
void writeAfterExtremes() override;
void writeLastSuffix() override;
void finalizeImpl() override;
virtual void writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num);
virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);
@ -70,17 +70,16 @@ protected:
void writeRowsBeforeLimitAtLeast();
void writeStatistics();
void onFirstRowNumberUpdate() override { row_count = getFirstRowNumber(); }
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
WriteBuffer * ostr;
size_t field_number = 0;
size_t row_count = 0;
bool applied_limit = false;
size_t rows_before_limit = 0;
NamesAndTypes fields;
Progress progress;
Stopwatch watch;
Statistics statistics;
FormatSettings settings;
bool yield_strings;

View File

@ -65,6 +65,8 @@ void registerOutputFormatMarkdown(FormatFactory & factory)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Markdown");
}
}

View File

@ -53,9 +53,13 @@ namespace DB
unit.segment.resize(0);
unit.status = READY_TO_FORMAT;
unit.type = type;
unit.statistics = statistics;
scheduleFormatterThreadForUnitWithNumber(current_unit_number);
size_t first_row_number = rows_consumed;
if (unit.type == ProcessingUnitType::PLAIN)
rows_consumed += unit.chunk.getNumRows();
scheduleFormatterThreadForUnitWithNumber(current_unit_number, first_row_number);
++writer_unit_number;
}
@ -144,7 +148,7 @@ namespace DB
}
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group)
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group)
{
setThreadName("Formatter");
if (thread_group)
@ -166,6 +170,7 @@ namespace DB
unit.segment.resize(0);
auto formatter = internal_formatter_creator(out_buffer);
formatter->setFirstRowNumber(first_row_num);
switch (unit.type)
{
@ -179,21 +184,35 @@ namespace DB
formatter->consume(std::move(unit.chunk));
break;
}
case ProcessingUnitType::PLAIN_FINISH :
{
formatter->writeSuffix();
break;
}
case ProcessingUnitType::TOTALS :
{
formatter->consumeTotals(std::move(unit.chunk));
are_totals_written = true;
break;
}
case ProcessingUnitType::EXTREMES :
{
if (are_totals_written)
formatter->setTotalsAreWritten();
formatter->consumeExtremes(std::move(unit.chunk));
break;
}
case ProcessingUnitType::FINALIZE :
{
formatter->setOutsideStatistics(unit.statistics);
formatter->finalizeImpl();
break;
}
case ProcessingUnitType::ON_PROGRESS :
{
formatter->onProgress(unit.statistics.progress);
break;
}
}
/// Flush all the data to handmade buffer.
formatter->flush();

View File

@ -4,6 +4,7 @@
#include <Common/Arena.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <base/logger_useful.h>
#include <Common/Exception.h>
#include "IO/WriteBufferFromString.h"
@ -104,8 +105,16 @@ public:
finishAndWait();
}
/// There are no formats which support parallel formatting and progress writing at the same time
void onProgress(const Progress &) override {}
void onProgress(const Progress & value) override
{
statistics.progress.incrementPiecewiseAtomically(value);
addChunk(Chunk{}, ProcessingUnitType::ON_PROGRESS, /*can_throw_exception*/ true);
}
void writeSuffix() override
{
addChunk(Chunk{}, ProcessingUnitType::PLAIN_FINISH, /*can_throw_exception*/ true);
}
String getContentType() const override
{
@ -146,9 +155,11 @@ private:
{
START,
PLAIN,
PLAIN_FINISH,
TOTALS,
EXTREMES,
FINALIZE
FINALIZE,
ON_PROGRESS,
};
void addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception);
@ -160,6 +171,7 @@ private:
Chunk chunk;
Memory<> segment;
size_t actual_memory_size{0};
Statistics statistics;
};
Poco::Event collector_finished{};
@ -186,6 +198,11 @@ private:
std::condition_variable collector_condvar;
std::condition_variable writer_condvar;
size_t rows_consumed = 0;
std::atomic_bool are_totals_written = false;
Statistics statistics;
void finishAndWait();
void onBackgroundException()
@ -200,11 +217,11 @@ private:
collector_condvar.notify_all();
}
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number)
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number, size_t first_row_num)
{
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number]
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number, first_row_num]
{
formatterThreadFunction(ticket_number, thread_group);
formatterThreadFunction(ticket_number, first_row_num, thread_group);
});
}
@ -212,7 +229,13 @@ private:
void collectorThreadFunction(const ThreadGroupStatusPtr & thread_group);
/// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory.
void formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group);
void formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group);
void setRowsBeforeLimit(size_t rows_before_limit) override
{
statistics.rows_before_limit = rows_before_limit;
statistics.applied_limit = true;
}
};
}

View File

@ -377,7 +377,6 @@ void PrettyBlockOutputFormat::consume(Chunk chunk)
void PrettyBlockOutputFormat::consumeTotals(Chunk chunk)
{
total_rows = 0;
writeSuffixIfNot();
writeCString("\nTotals:\n", out);
write(chunk, PortKind::Totals);
}
@ -385,7 +384,6 @@ void PrettyBlockOutputFormat::consumeTotals(Chunk chunk)
void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk)
{
total_rows = 0;
writeSuffixIfNot();
writeCString("\nExtremes:\n", out);
write(chunk, PortKind::Extremes);
}
@ -401,11 +399,6 @@ void PrettyBlockOutputFormat::writeSuffix()
}
}
void PrettyBlockOutputFormat::finalizeImpl()
{
writeSuffixIfNot();
}
void registerOutputFormatPretty(FormatFactory & factory)
{
@ -418,6 +411,8 @@ void registerOutputFormatPretty(FormatFactory & factory)
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting("Pretty");
factory.registerOutputFormat("PrettyNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
@ -428,6 +423,8 @@ void registerOutputFormatPretty(FormatFactory & factory)
changed_settings.pretty.color = false;
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, changed_settings);
});
factory.markOutputFormatSupportsParallelFormatting("PrettyNoEscapes");
}
}

View File

@ -27,11 +27,8 @@ protected:
void consumeTotals(Chunk) override;
void consumeExtremes(Chunk) override;
void finalizeImpl() override;
size_t total_rows = 0;
size_t terminal_width = 0;
bool suffix_written = false;
size_t row_number_width = 7; // "10000. "
@ -41,16 +38,9 @@ protected:
using WidthsPerColumn = std::vector<Widths>;
virtual void write(const Chunk & chunk, PortKind port_kind);
virtual void writeSuffix();
void writeSuffix() override;
virtual void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
void onFirstRowNumberUpdate() override { total_rows = getFirstRowNumber(); }
void calculateWidths(
const Block & header, const Chunk & chunk,

View File

@ -54,7 +54,7 @@ PrettyCompactBlockOutputFormat::PrettyCompactBlockOutputFormat(WriteBuffer & out
{
}
void PrettyCompactBlockOutputFormat::writeSuffixIfNot()
void PrettyCompactBlockOutputFormat::writeSuffix()
{
if (mono_chunk)
{
@ -62,7 +62,7 @@ void PrettyCompactBlockOutputFormat::writeSuffixIfNot()
mono_chunk.clear();
}
PrettyBlockOutputFormat::writeSuffixIfNot();
PrettyBlockOutputFormat::writeSuffix();
}
void PrettyCompactBlockOutputFormat::writeHeader(
@ -218,7 +218,7 @@ void PrettyCompactBlockOutputFormat::write(const Chunk & chunk, PortKind port_ki
}
else
{
/// Should be written from writeSuffixIfNot()
/// Should be written from writeSuffix()
assert(!mono_chunk);
}
}
@ -269,6 +269,8 @@ void registerOutputFormatPrettyCompact(FormatFactory & factory)
});
}
factory.markOutputFormatSupportsParallelFormatting("PrettyCompact");
factory.registerOutputFormat("PrettyCompactNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
@ -279,6 +281,7 @@ void registerOutputFormatPrettyCompact(FormatFactory & factory)
changed_settings.pretty.color = false;
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, changed_settings, false /* mono_block */);
});
factory.markOutputFormatSupportsParallelFormatting("PrettyCompactNoEscapes");
}
}

View File

@ -33,7 +33,7 @@ private:
Chunk mono_chunk;
void writeChunk(const Chunk & chunk, PortKind port_kind);
void writeSuffixIfNot() override;
void writeSuffix() override;
};
}

View File

@ -124,6 +124,8 @@ void registerOutputFormatPrettySpace(FormatFactory & factory)
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting("PrettySpace");
factory.registerOutputFormat("PrettySpaceNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
@ -134,6 +136,8 @@ void registerOutputFormatPrettySpace(FormatFactory & factory)
changed_settings.pretty.color = false;
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, changed_settings);
});
factory.markOutputFormatSupportsParallelFormatting("PrettySpaceNoEscapes");
}
}

View File

@ -144,6 +144,9 @@ void TemplateBlockOutputFormat::finalizeImpl()
return;
size_t parts = format.format_idx_to_column_idx.size();
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
statistics = outside_statistics.value();
for (size_t i = 0; i < parts; ++i)
{
@ -170,7 +173,7 @@ void TemplateBlockOutputFormat::finalizeImpl()
writeValue<size_t, DataTypeUInt64>(row_count, format.escaping_rules[i]);
break;
case ResultsetPart::RowsBeforeLimit:
if (!rows_before_limit_set)
if (!statistics.applied_limit)
format.throwInvalidFormat("Cannot print rows_before_limit for this request", i);
writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.escaping_rules[i]);
break;

View File

@ -20,8 +20,8 @@ public:
String getName() const override { return "TemplateBlockOutputFormat"; }
void setRowsBeforeLimit(size_t rows_before_limit_) override { rows_before_limit = rows_before_limit_; rows_before_limit_set = true; }
void onProgress(const Progress & progress_) override { progress.incrementPiecewiseAtomically(progress_); }
void setRowsBeforeLimit(size_t rows_before_limit_) override { statistics.rows_before_limit = rows_before_limit_; statistics.rows_before_limit = true; }
void onProgress(const Progress & progress_) override { statistics.progress.incrementPiecewiseAtomically(progress_); }
enum class ResultsetPart : size_t
{
@ -48,18 +48,17 @@ private:
void writeRow(const Chunk & chunk, size_t row_num);
template <typename U, typename V> void writeValue(U value, EscapingRule escaping_rule);
void onFirstRowNumberUpdate() override { row_count = getFirstRowNumber(); }
const FormatSettings settings;
Serializations serializations;
ParsedTemplateFormatString format;
ParsedTemplateFormatString row_format;
size_t rows_before_limit = 0;
bool rows_before_limit_set = false;
Chunk totals;
Chunk extremes;
Progress progress;
Stopwatch watch;
Statistics statistics;
size_t row_count = 0;

View File

@ -51,6 +51,8 @@ void registerOutputFormatValues(FormatFactory & factory)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Values");
}
}

View File

@ -115,7 +115,7 @@ void VerticalRowOutputFormat::writeBeforeTotals()
void VerticalRowOutputFormat::writeBeforeExtremes()
{
if (!was_totals_written)
if (!areTotalsWritten())
writeCString("\n", out);
writeCString("\n", out);
@ -134,7 +134,6 @@ void VerticalRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t ro
void VerticalRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeSpecialRow(columns, row_num, "Totals");
was_totals_written = true;
}
void VerticalRowOutputFormat::writeSpecialRow(const Columns & columns, size_t row_num, const char * title)
@ -153,12 +152,7 @@ void VerticalRowOutputFormat::writeSpecialRow(const Columns & columns, size_t ro
writeChar('\n', out);
for (size_t i = 0; i < num_columns; ++i)
{
if (i != 0)
writeFieldDelimiter();
writeField(*columns[i], *serializations[i], row_num);
}
}
void registerOutputFormatVertical(FormatFactory & factory)
@ -171,6 +165,8 @@ void registerOutputFormatVertical(FormatFactory & factory)
{
return std::make_shared<VerticalRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Vertical");
}
}

View File

@ -37,13 +37,14 @@ private:
void writeValue(const IColumn & column, const ISerialization & serialization, size_t row_num) const;
void onFirstRowNumberUpdate() override { row_number = getFirstRowNumber(); }
/// For totals and extremes.
void writeSpecialRow(const Columns & columns, size_t row_num, const char * title);
const FormatSettings format_settings;
size_t field_number = 0;
size_t row_number = 0;
bool was_totals_written = false;
using NamesAndPaddings = std::vector<String>;
NamesAndPaddings names_and_paddings;

View File

@ -195,16 +195,20 @@ void XMLRowOutputFormat::writeExtremesElement(const char * title, const Columns
void XMLRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
statistics.progress.incrementPiecewiseAtomically(value);
}
void XMLRowOutputFormat::writeLastSuffix()
void XMLRowOutputFormat::finalizeImpl()
{
writeCString("\t<rows>", *ostr);
writeIntText(row_count, *ostr);
writeCString("</rows>\n", *ostr);
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
statistics = outside_statistics.value();
writeRowsBeforeLimitAtLeast();
if (format_settings.write_statistics)
@ -216,10 +220,10 @@ void XMLRowOutputFormat::writeLastSuffix()
void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (applied_limit)
if (statistics.applied_limit)
{
writeCString("\t<rows_before_limit_at_least>", *ostr);
writeIntText(rows_before_limit, *ostr);
writeIntText(statistics.rows_before_limit, *ostr);
writeCString("</rows_before_limit_at_least>\n", *ostr);
}
}
@ -228,13 +232,13 @@ void XMLRowOutputFormat::writeStatistics()
{
writeCString("\t<statistics>\n", *ostr);
writeCString("\t\t<elapsed>", *ostr);
writeText(watch.elapsedSeconds(), *ostr);
writeText(statistics.watch.elapsedSeconds(), *ostr);
writeCString("</elapsed>\n", *ostr);
writeCString("\t\t<rows_read>", *ostr);
writeText(progress.read_rows.load(), *ostr);
writeText(statistics.progress.read_rows.load(), *ostr);
writeCString("</rows_read>\n", *ostr);
writeCString("\t\t<bytes_read>", *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeText(statistics.progress.read_bytes.load(), *ostr);
writeCString("</bytes_read>\n", *ostr);
writeCString("\t</statistics>\n", *ostr);
}
@ -250,6 +254,8 @@ void registerOutputFormatXML(FormatFactory & factory)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("XML");
}
}

View File

@ -26,7 +26,7 @@ private:
void writeRowEndDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
void writeLastSuffix() override;
void finalizeImpl() override;
void writeMinExtreme(const Columns & columns, size_t row_num) override;
void writeMaxExtreme(const Columns & columns, size_t row_num) override;
@ -47,10 +47,12 @@ private:
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
statistics.applied_limit = true;
statistics.rows_before_limit = rows_before_limit_;
}
void onFirstRowNumberUpdate() override { row_count = getFirstRowNumber(); }
void onProgress(const Progress & value) override;
String getContentType() const override { return "application/xml; charset=UTF-8"; }
@ -64,13 +66,10 @@ private:
size_t field_number = 0;
size_t row_count = 0;
bool applied_limit = false;
size_t rows_before_limit = 0;
NamesAndTypes fields;
Names field_tag_names;
Progress progress;
Stopwatch watch;
Statistics statistics;
const FormatSettings format_settings;
};