Merge pull request #31489 from Avogar/parallel-formatting

Support parallel formatting almost for all text formats
This commit is contained in:
Kruglov Pavel 2021-11-26 15:21:22 +03:00 committed by GitHub
commit af998af710
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 379 additions and 120 deletions

View File

@ -72,7 +72,7 @@ void IOutputFormat::work()
if (rows_before_limit_counter && rows_before_limit_counter->hasAppliedLimit())
setRowsBeforeLimit(rows_before_limit_counter->get());
finalizeImpl();
finalize();
finalized = true;
return;
}
@ -85,10 +85,15 @@ void IOutputFormat::work()
consume(std::move(current_chunk));
break;
case Totals:
writeSuffixIfNot();
if (auto totals = prepareTotals(std::move(current_chunk)))
{
consumeTotals(std::move(totals));
are_totals_written = true;
}
break;
case Extremes:
writeSuffixIfNot();
consumeExtremes(std::move(current_chunk));
break;
}
@ -116,6 +121,7 @@ void IOutputFormat::write(const Block & block)
void IOutputFormat::finalize()
{
writePrefixIfNot();
writeSuffixIfNot();
finalizeImpl();
}

View File

@ -4,7 +4,7 @@
#include <Processors/IProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <IO/Progress.h>
#include <Common/Stopwatch.h>
namespace DB
{
@ -59,8 +59,17 @@ public:
virtual bool expectMaterializedColumns() const { return true; }
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
void setTotals(const Block & totals)
{
writeSuffixIfNot();
consumeTotals(Chunk(totals.getColumns(), totals.rows()));
are_totals_written = true;
}
void setExtremes(const Block & extremes)
{
writeSuffixIfNot();
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
}
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }
@ -75,6 +84,7 @@ protected:
virtual void consumeExtremes(Chunk) {}
virtual void finalizeImpl() {}
virtual void writePrefix() {}
virtual void writeSuffix() {}
void writePrefixIfNot()
{
@ -85,6 +95,56 @@ protected:
}
}
void writeSuffixIfNot()
{
if (need_write_suffix)
{
writeSuffix();
need_write_suffix = false;
}
}
/// Methods-helpers for parallel formatting.
/// Set the number of rows that was already read in
/// parallel formatting before creating this formatter.
void setRowsReadBefore(size_t first_row_number_)
{
rows_read_before = first_row_number_;
onRowsReadBeforeUpdate();
}
size_t getRowsReadBefore() const { return rows_read_before; }
/// Update state according to new rows_read_before.
virtual void onRowsReadBeforeUpdate() {}
/// Some formats outputs some statistics after the data,
/// in parallel formatting we collect these statistics outside the
/// underling format and then set it to format before finalizing.
struct Statistics
{
Stopwatch watch;
Progress progress;
bool applied_limit = false;
size_t rows_before_limit = 0;
Chunk totals;
Chunk extremes;
};
void setOutsideStatistics(Statistics statistics_) { statistics = std::make_shared<Statistics>(std::move(statistics_)); }
std::shared_ptr<Statistics> getOutsideStatistics() const { return statistics; }
/// In some formats the way we print extremes depends on
/// were totals printed or not. In this case in parallel formatting
/// we should notify underling format if totals were printed.
void setTotalsAreWritten() { are_totals_written = true; }
bool areTotalsWritten() const { return are_totals_written; }
/// Return true if format saves totals and extremes in consumeTotals/consumeExtremes and
/// outputs them in finalize() method.
virtual bool areTotalsAndExtremesUsedInFinalize() const { return false; }
WriteBuffer & out;
Chunk current_chunk;
@ -95,11 +155,17 @@ protected:
/// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready.
bool auto_flush = false;
bool need_write_prefix = true;
bool need_write_suffix = true;
RowsBeforeLimitCounterPtr rows_before_limit_counter;
private:
size_t rows_read_before = 0;
std::shared_ptr<Statistics> statistics = nullptr;
bool are_totals_written = false;
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;

View File

@ -27,7 +27,7 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
for (size_t row = 0; row < num_rows; ++row)
{
if (!first_row)
if (!first_row || getRowsReadBefore() != 0)
writeRowBetweenDelimiter();
write(columns, row);
@ -41,8 +41,6 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
void IRowOutputFormat::consumeTotals(DB::Chunk chunk)
{
writeSuffixIfNot();
auto num_rows = chunk.getNumRows();
if (num_rows != 1)
throw Exception("Got " + toString(num_rows) + " in totals chunk, expected 1", ErrorCodes::LOGICAL_ERROR);
@ -56,8 +54,6 @@ void IRowOutputFormat::consumeTotals(DB::Chunk chunk)
void IRowOutputFormat::consumeExtremes(DB::Chunk chunk)
{
writeSuffixIfNot();
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
if (num_rows != 2)
@ -70,12 +66,6 @@ void IRowOutputFormat::consumeExtremes(DB::Chunk chunk)
writeAfterExtremes();
}
void IRowOutputFormat::finalizeImpl()
{
writeSuffixIfNot();
writeLastSuffix();
}
void IRowOutputFormat::write(const Columns & columns, size_t row_num)
{
size_t num_columns = columns.size();

View File

@ -31,7 +31,6 @@ protected:
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
void finalizeImpl() override;
/** Write a row.
* Default implementation calls methods to write single values and delimiters
@ -51,29 +50,18 @@ protected:
virtual void writeRowEndDelimiter() {} /// delimiter after each row
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
virtual void writePrefix() override {} /// delimiter before resultset
virtual void writeSuffix() {} /// delimiter after resultset
virtual void writeSuffix() override {} /// delimiter after resultset
virtual void writeBeforeTotals() {}
virtual void writeAfterTotals() {}
virtual void writeBeforeExtremes() {}
virtual void writeAfterExtremes() {}
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
virtual void finalizeImpl() override {} /// Write something after resultset, totals end extremes.
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
private:
void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
bool suffix_written = false;
};
}

View File

@ -60,6 +60,7 @@ void registerOutputFormatRowBinary(FormatFactory & factory)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, with_names, with_types, params);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
registerWithNamesAndTypes("RowBinary", register_func);

View File

@ -89,6 +89,8 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
{
return std::make_shared<CustomSeparatedRowOutputFormat>(sample, buf, params, settings, with_names, with_types);
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
};
registerWithNamesAndTypes("CustomSeparated", register_func);

View File

@ -99,6 +99,8 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompact");
factory.registerOutputFormat("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
@ -107,6 +109,8 @@ void registerOutputFormatJSONCompact(FormatFactory & factory)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONCompactStrings");
}
}

View File

@ -217,12 +217,16 @@ void JSONRowOutputFormat::writeAfterExtremes()
writeCString("\t}", *ostr);
}
void JSONRowOutputFormat::writeLastSuffix()
void JSONRowOutputFormat::finalizeImpl()
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows\": ", *ostr);
writeIntText(row_count, *ostr);
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
statistics = std::move(*outside_statistics);
writeRowsBeforeLimitAtLeast();
if (settings.write_statistics)
@ -235,11 +239,11 @@ void JSONRowOutputFormat::writeLastSuffix()
void JSONRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (applied_limit)
if (statistics.applied_limit)
{
writeCString(",\n\n", *ostr);
writeCString("\t\"rows_before_limit_at_least\": ", *ostr);
writeIntText(rows_before_limit, *ostr);
writeIntText(statistics.rows_before_limit, *ostr);
}
}
@ -250,13 +254,13 @@ void JSONRowOutputFormat::writeStatistics()
writeCString("\t{\n", *ostr);
writeCString("\t\t\"elapsed\": ", *ostr);
writeText(watch.elapsedSeconds(), *ostr);
writeText(statistics.watch.elapsedSeconds(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"rows_read\": ", *ostr);
writeText(progress.read_rows.load(), *ostr);
writeText(statistics.progress.read_rows.load(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"bytes_read\": ", *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeText(statistics.progress.read_bytes.load(), *ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
@ -264,7 +268,7 @@ void JSONRowOutputFormat::writeStatistics()
void JSONRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
statistics.progress.incrementPiecewiseAtomically(value);
}
@ -279,6 +283,8 @@ void registerOutputFormatJSON(FormatFactory & factory)
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
@ -287,6 +293,8 @@ void registerOutputFormatJSON(FormatFactory & factory)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
}
}

View File

@ -39,8 +39,8 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
statistics.applied_limit = true;
statistics.rows_before_limit = rows_before_limit_;
}
protected:
@ -61,7 +61,7 @@ protected:
void writeBeforeExtremes() override;
void writeAfterExtremes() override;
void writeLastSuffix() override;
void finalizeImpl() override;
virtual void writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num);
virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);
@ -70,17 +70,16 @@ protected:
void writeRowsBeforeLimitAtLeast();
void writeStatistics();
void onRowsReadBeforeUpdate() override { row_count = getRowsReadBefore(); }
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
WriteBuffer * ostr;
size_t field_number = 0;
size_t row_count = 0;
bool applied_limit = false;
size_t rows_before_limit = 0;
NamesAndTypes fields;
Progress progress;
Stopwatch watch;
Statistics statistics;
FormatSettings settings;
bool yield_strings;

View File

@ -65,6 +65,8 @@ void registerOutputFormatMarkdown(FormatFactory & factory)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Markdown");
}
}

View File

@ -53,9 +53,17 @@ namespace DB
unit.segment.resize(0);
unit.status = READY_TO_FORMAT;
unit.type = type;
if (type == ProcessingUnitType::FINALIZE)
{
std::lock_guard lock(statistics_mutex);
unit.statistics = std::move(statistics);
}
scheduleFormatterThreadForUnitWithNumber(current_unit_number);
size_t first_row_num = rows_consumed;
if (unit.type == ProcessingUnitType::PLAIN)
rows_consumed += unit.chunk.getNumRows();
scheduleFormatterThreadForUnitWithNumber(current_unit_number, first_row_num);
++writer_unit_number;
}
@ -144,7 +152,7 @@ namespace DB
}
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group)
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group)
{
setThreadName("Formatter");
if (thread_group)
@ -166,6 +174,7 @@ namespace DB
unit.segment.resize(0);
auto formatter = internal_formatter_creator(out_buffer);
formatter->setRowsReadBefore(first_row_num);
switch (unit.type)
{
@ -179,6 +188,11 @@ namespace DB
formatter->consume(std::move(unit.chunk));
break;
}
case ProcessingUnitType::PLAIN_FINISH :
{
formatter->writeSuffix();
break;
}
case ProcessingUnitType::TOTALS :
{
formatter->consumeTotals(std::move(unit.chunk));
@ -186,11 +200,14 @@ namespace DB
}
case ProcessingUnitType::EXTREMES :
{
if (are_totals_written)
formatter->setTotalsAreWritten();
formatter->consumeExtremes(std::move(unit.chunk));
break;
}
case ProcessingUnitType::FINALIZE :
{
formatter->setOutsideStatistics(std::move(unit.statistics));
formatter->finalizeImpl();
break;
}

View File

@ -4,6 +4,7 @@
#include <Common/Arena.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <base/logger_useful.h>
#include <Common/Exception.h>
#include "IO/WriteBufferFromString.h"
@ -11,6 +12,7 @@
#include <Poco/Event.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/NullWriteBuffer.h>
#include <deque>
#include <atomic>
@ -79,6 +81,10 @@ public:
{
collectorThreadFunction(thread_group);
});
NullWriteBuffer buf;
save_totals_and_extremes_in_statistics = internal_formatter_creator(buf)->areTotalsAndExtremesUsedInFinalize();
LOG_TRACE(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used");
}
@ -104,8 +110,16 @@ public:
finishAndWait();
}
/// There are no formats which support parallel formatting and progress writing at the same time
void onProgress(const Progress &) override {}
void onProgress(const Progress & value) override
{
std::lock_guard lock(statistics_mutex);
statistics.progress.incrementPiecewiseAtomically(value);
}
void writeSuffix() override
{
addChunk(Chunk{}, ProcessingUnitType::PLAIN_FINISH, /*can_throw_exception*/ true);
}
String getContentType() const override
{
@ -121,12 +135,29 @@ private:
void consumeTotals(Chunk totals) override
{
addChunk(std::move(totals), ProcessingUnitType::TOTALS, /*can_throw_exception*/ true);
if (save_totals_and_extremes_in_statistics)
{
std::lock_guard lock(statistics_mutex);
statistics.totals = std::move(totals);
}
else
{
addChunk(std::move(totals), ProcessingUnitType::TOTALS, /*can_throw_exception*/ true);
are_totals_written = true;
}
}
void consumeExtremes(Chunk extremes) override
{
addChunk(std::move(extremes), ProcessingUnitType::EXTREMES, /*can_throw_exception*/ true);
if (save_totals_and_extremes_in_statistics)
{
std::lock_guard lock(statistics_mutex);
statistics.extremes = std::move(extremes);
}
else
{
addChunk(std::move(extremes), ProcessingUnitType::EXTREMES, /*can_throw_exception*/ true);
}
}
void finalizeImpl() override;
@ -146,9 +177,10 @@ private:
{
START,
PLAIN,
PLAIN_FINISH,
TOTALS,
EXTREMES,
FINALIZE
FINALIZE,
};
void addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception);
@ -160,6 +192,7 @@ private:
Chunk chunk;
Memory<> segment;
size_t actual_memory_size{0};
Statistics statistics;
};
Poco::Event collector_finished{};
@ -186,6 +219,14 @@ private:
std::condition_variable collector_condvar;
std::condition_variable writer_condvar;
size_t rows_consumed = 0;
std::atomic_bool are_totals_written = false;
Statistics statistics;
/// We change statistics in onProgress() which can be called from different threads.
std::mutex statistics_mutex;
bool save_totals_and_extremes_in_statistics;
void finishAndWait();
void onBackgroundException()
@ -200,11 +241,11 @@ private:
collector_condvar.notify_all();
}
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number)
void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number, size_t first_row_num)
{
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number]
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number, first_row_num]
{
formatterThreadFunction(ticket_number, thread_group);
formatterThreadFunction(ticket_number, first_row_num, thread_group);
});
}
@ -212,7 +253,14 @@ private:
void collectorThreadFunction(const ThreadGroupStatusPtr & thread_group);
/// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory.
void formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group);
void formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group);
void setRowsBeforeLimit(size_t rows_before_limit) override
{
std::lock_guard lock(statistics_mutex);
statistics.rows_before_limit = rows_before_limit;
statistics.applied_limit = true;
}
};
}

View File

@ -377,7 +377,6 @@ void PrettyBlockOutputFormat::consume(Chunk chunk)
void PrettyBlockOutputFormat::consumeTotals(Chunk chunk)
{
total_rows = 0;
writeSuffixIfNot();
writeCString("\nTotals:\n", out);
write(chunk, PortKind::Totals);
}
@ -385,7 +384,6 @@ void PrettyBlockOutputFormat::consumeTotals(Chunk chunk)
void PrettyBlockOutputFormat::consumeExtremes(Chunk chunk)
{
total_rows = 0;
writeSuffixIfNot();
writeCString("\nExtremes:\n", out);
write(chunk, PortKind::Extremes);
}
@ -401,11 +399,6 @@ void PrettyBlockOutputFormat::writeSuffix()
}
}
void PrettyBlockOutputFormat::finalizeImpl()
{
writeSuffixIfNot();
}
void registerOutputFormatPretty(FormatFactory & factory)
{
@ -418,6 +411,8 @@ void registerOutputFormatPretty(FormatFactory & factory)
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting("Pretty");
factory.registerOutputFormat("PrettyNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
@ -428,6 +423,8 @@ void registerOutputFormatPretty(FormatFactory & factory)
changed_settings.pretty.color = false;
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, changed_settings);
});
factory.markOutputFormatSupportsParallelFormatting("PrettyNoEscapes");
}
}

View File

@ -27,11 +27,8 @@ protected:
void consumeTotals(Chunk) override;
void consumeExtremes(Chunk) override;
void finalizeImpl() override;
size_t total_rows = 0;
size_t terminal_width = 0;
bool suffix_written = false;
size_t row_number_width = 7; // "10000. "
@ -41,16 +38,9 @@ protected:
using WidthsPerColumn = std::vector<Widths>;
virtual void write(const Chunk & chunk, PortKind port_kind);
virtual void writeSuffix();
void writeSuffix() override;
virtual void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
void onRowsReadBeforeUpdate() override { total_rows = getRowsReadBefore(); }
void calculateWidths(
const Block & header, const Chunk & chunk,

View File

@ -54,7 +54,7 @@ PrettyCompactBlockOutputFormat::PrettyCompactBlockOutputFormat(WriteBuffer & out
{
}
void PrettyCompactBlockOutputFormat::writeSuffixIfNot()
void PrettyCompactBlockOutputFormat::writeSuffix()
{
if (mono_chunk)
{
@ -62,7 +62,7 @@ void PrettyCompactBlockOutputFormat::writeSuffixIfNot()
mono_chunk.clear();
}
PrettyBlockOutputFormat::writeSuffixIfNot();
PrettyBlockOutputFormat::writeSuffix();
}
void PrettyCompactBlockOutputFormat::writeHeader(
@ -218,7 +218,7 @@ void PrettyCompactBlockOutputFormat::write(const Chunk & chunk, PortKind port_ki
}
else
{
/// Should be written from writeSuffixIfNot()
/// Should be written from writeSuffix()
assert(!mono_chunk);
}
}
@ -269,6 +269,8 @@ void registerOutputFormatPrettyCompact(FormatFactory & factory)
});
}
factory.markOutputFormatSupportsParallelFormatting("PrettyCompact");
factory.registerOutputFormat("PrettyCompactNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
@ -279,6 +281,7 @@ void registerOutputFormatPrettyCompact(FormatFactory & factory)
changed_settings.pretty.color = false;
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, changed_settings, false /* mono_block */);
});
factory.markOutputFormatSupportsParallelFormatting("PrettyCompactNoEscapes");
}
}

View File

@ -33,7 +33,7 @@ private:
Chunk mono_chunk;
void writeChunk(const Chunk & chunk, PortKind port_kind);
void writeSuffixIfNot() override;
void writeSuffix() override;
};
}

View File

@ -124,6 +124,8 @@ void registerOutputFormatPrettySpace(FormatFactory & factory)
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
});
factory.markOutputFormatSupportsParallelFormatting("PrettySpace");
factory.registerOutputFormat("PrettySpaceNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
@ -134,6 +136,8 @@ void registerOutputFormatPrettySpace(FormatFactory & factory)
changed_settings.pretty.color = false;
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, changed_settings);
});
factory.markOutputFormatSupportsParallelFormatting("PrettySpaceNoEscapes");
}
}

View File

@ -144,6 +144,9 @@ void TemplateBlockOutputFormat::finalizeImpl()
return;
size_t parts = format.format_idx_to_column_idx.size();
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
statistics = std::move(*outside_statistics);
for (size_t i = 0; i < parts; ++i)
{
@ -152,36 +155,36 @@ void TemplateBlockOutputFormat::finalizeImpl()
switch (static_cast<ResultsetPart>(*format.format_idx_to_column_idx[i]))
{
case ResultsetPart::Totals:
if (!totals || !totals.hasRows())
if (!statistics.totals || !statistics.totals.hasRows())
format.throwInvalidFormat("Cannot print totals for this request", i);
writeRow(totals, 0);
writeRow(statistics.totals, 0);
break;
case ResultsetPart::ExtremesMin:
if (!extremes)
if (!statistics.extremes)
format.throwInvalidFormat("Cannot print extremes for this request", i);
writeRow(extremes, 0);
writeRow(statistics.extremes, 0);
break;
case ResultsetPart::ExtremesMax:
if (!extremes)
if (!statistics.extremes)
format.throwInvalidFormat("Cannot print extremes for this request", i);
writeRow(extremes, 1);
writeRow(statistics.extremes, 1);
break;
case ResultsetPart::Rows:
writeValue<size_t, DataTypeUInt64>(row_count, format.escaping_rules[i]);
break;
case ResultsetPart::RowsBeforeLimit:
if (!rows_before_limit_set)
if (!statistics.applied_limit)
format.throwInvalidFormat("Cannot print rows_before_limit for this request", i);
writeValue<size_t, DataTypeUInt64>(rows_before_limit, format.escaping_rules[i]);
writeValue<size_t, DataTypeUInt64>(statistics.rows_before_limit, format.escaping_rules[i]);
break;
case ResultsetPart::TimeElapsed:
writeValue<double, DataTypeFloat64>(watch.elapsedSeconds(), format.escaping_rules[i]);
writeValue<double, DataTypeFloat64>(statistics.watch.elapsedSeconds(), format.escaping_rules[i]);
break;
case ResultsetPart::RowsRead:
writeValue<size_t, DataTypeUInt64>(progress.read_rows.load(), format.escaping_rules[i]);
writeValue<size_t, DataTypeUInt64>(statistics.progress.read_rows.load(), format.escaping_rules[i]);
break;
case ResultsetPart::BytesRead:
writeValue<size_t, DataTypeUInt64>(progress.read_bytes.load(), format.escaping_rules[i]);
writeValue<size_t, DataTypeUInt64>(statistics.progress.read_bytes.load(), format.escaping_rules[i]);
break;
default:
break;

View File

@ -20,8 +20,8 @@ public:
String getName() const override { return "TemplateBlockOutputFormat"; }
void setRowsBeforeLimit(size_t rows_before_limit_) override { rows_before_limit = rows_before_limit_; rows_before_limit_set = true; }
void onProgress(const Progress & progress_) override { progress.incrementPiecewiseAtomically(progress_); }
void setRowsBeforeLimit(size_t rows_before_limit_) override { statistics.rows_before_limit = rows_before_limit_; statistics.applied_limit = true; }
void onProgress(const Progress & progress_) override { statistics.progress.incrementPiecewiseAtomically(progress_); }
enum class ResultsetPart : size_t
{
@ -41,25 +41,23 @@ public:
private:
void writePrefix() override;
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void consumeTotals(Chunk chunk) override { statistics.totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { statistics.extremes = std::move(chunk); }
void finalizeImpl() override;
void writeRow(const Chunk & chunk, size_t row_num);
template <typename U, typename V> void writeValue(U value, EscapingRule escaping_rule);
void onRowsReadBeforeUpdate() override { row_count = getRowsReadBefore(); }
bool areTotalsAndExtremesUsedInFinalize() const override { return true; }
const FormatSettings settings;
Serializations serializations;
ParsedTemplateFormatString format;
ParsedTemplateFormatString row_format;
size_t rows_before_limit = 0;
bool rows_before_limit_set = false;
Chunk totals;
Chunk extremes;
Progress progress;
Stopwatch watch;
Statistics statistics;
size_t row_count = 0;

View File

@ -51,6 +51,8 @@ void registerOutputFormatValues(FormatFactory & factory)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Values");
}
}

View File

@ -115,7 +115,7 @@ void VerticalRowOutputFormat::writeBeforeTotals()
void VerticalRowOutputFormat::writeBeforeExtremes()
{
if (!was_totals_written)
if (!areTotalsWritten())
writeCString("\n", out);
writeCString("\n", out);
@ -134,7 +134,6 @@ void VerticalRowOutputFormat::writeMaxExtreme(const Columns & columns, size_t ro
void VerticalRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeSpecialRow(columns, row_num, "Totals");
was_totals_written = true;
}
void VerticalRowOutputFormat::writeSpecialRow(const Columns & columns, size_t row_num, const char * title)
@ -153,12 +152,7 @@ void VerticalRowOutputFormat::writeSpecialRow(const Columns & columns, size_t ro
writeChar('\n', out);
for (size_t i = 0; i < num_columns; ++i)
{
if (i != 0)
writeFieldDelimiter();
writeField(*columns[i], *serializations[i], row_num);
}
}
void registerOutputFormatVertical(FormatFactory & factory)
@ -171,6 +165,8 @@ void registerOutputFormatVertical(FormatFactory & factory)
{
return std::make_shared<VerticalRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("Vertical");
}
}

View File

@ -37,13 +37,14 @@ private:
void writeValue(const IColumn & column, const ISerialization & serialization, size_t row_num) const;
void onRowsReadBeforeUpdate() override { row_number = getRowsReadBefore(); }
/// For totals and extremes.
void writeSpecialRow(const Columns & columns, size_t row_num, const char * title);
const FormatSettings format_settings;
size_t field_number = 0;
size_t row_number = 0;
bool was_totals_written = false;
using NamesAndPaddings = std::vector<String>;
NamesAndPaddings names_and_paddings;

View File

@ -195,16 +195,20 @@ void XMLRowOutputFormat::writeExtremesElement(const char * title, const Columns
void XMLRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
statistics.progress.incrementPiecewiseAtomically(value);
}
void XMLRowOutputFormat::writeLastSuffix()
void XMLRowOutputFormat::finalizeImpl()
{
writeCString("\t<rows>", *ostr);
writeIntText(row_count, *ostr);
writeCString("</rows>\n", *ostr);
auto outside_statistics = getOutsideStatistics();
if (outside_statistics)
statistics = std::move(*outside_statistics);
writeRowsBeforeLimitAtLeast();
if (format_settings.write_statistics)
@ -216,10 +220,10 @@ void XMLRowOutputFormat::writeLastSuffix()
void XMLRowOutputFormat::writeRowsBeforeLimitAtLeast()
{
if (applied_limit)
if (statistics.applied_limit)
{
writeCString("\t<rows_before_limit_at_least>", *ostr);
writeIntText(rows_before_limit, *ostr);
writeIntText(statistics.rows_before_limit, *ostr);
writeCString("</rows_before_limit_at_least>\n", *ostr);
}
}
@ -228,13 +232,13 @@ void XMLRowOutputFormat::writeStatistics()
{
writeCString("\t<statistics>\n", *ostr);
writeCString("\t\t<elapsed>", *ostr);
writeText(watch.elapsedSeconds(), *ostr);
writeText(statistics.watch.elapsedSeconds(), *ostr);
writeCString("</elapsed>\n", *ostr);
writeCString("\t\t<rows_read>", *ostr);
writeText(progress.read_rows.load(), *ostr);
writeText(statistics.progress.read_rows.load(), *ostr);
writeCString("</rows_read>\n", *ostr);
writeCString("\t\t<bytes_read>", *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeText(statistics.progress.read_bytes.load(), *ostr);
writeCString("</bytes_read>\n", *ostr);
writeCString("\t</statistics>\n", *ostr);
}
@ -250,6 +254,8 @@ void registerOutputFormatXML(FormatFactory & factory)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("XML");
}
}

View File

@ -26,7 +26,7 @@ private:
void writeRowEndDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
void writeLastSuffix() override;
void finalizeImpl() override;
void writeMinExtreme(const Columns & columns, size_t row_num) override;
void writeMaxExtreme(const Columns & columns, size_t row_num) override;
@ -47,10 +47,12 @@ private:
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
statistics.applied_limit = true;
statistics.rows_before_limit = rows_before_limit_;
}
void onRowsReadBeforeUpdate() override { row_count = getRowsReadBefore(); }
void onProgress(const Progress & value) override;
String getContentType() const override { return "application/xml; charset=UTF-8"; }
@ -64,13 +66,10 @@ private:
size_t field_number = 0;
size_t row_count = 0;
bool applied_limit = false;
size_t rows_before_limit = 0;
NamesAndTypes fields;
Names field_tag_names;
Progress progress;
Stopwatch watch;
Statistics statistics;
const FormatSettings format_settings;
};

View File

@ -0,0 +1,68 @@
RowBinary-1
RowBinary-2
RowBinaryWithNames-1
RowBinaryWithNames-2
RowBinaryWithNamesAndTypes-1
RowBinaryWithNamesAndTypes-2
XML-1
XML-2
Markdown-1
Markdown-2
Vertical-1
Vertical-2
Values-1
Values-2
JSONEachRow-1
JSONEachRow-2
JSONStringsEachRow-1
JSONStringsEachRow-2
TSKV-1
TSKV-2
Pretty-1
Pretty-2
PrettyNoEscapes-1
PrettyNoEscapes-2
JSON-1
JSON-2
JSONStrings-1
JSONStrings-2
JSONCompact-1
JSONCompact-2
JSONCompactStrings-1
JSONCompactStrings-2
PrettyCompact-1
PrettyCompact-2
PrettyCompactNoEscapes-1
PrettyCompactNoEscapes-2
PrettySpace-1
PrettySpace-2
PrettySpaceNoEscapes-1
PrettySpaceNoEscapes-2
TSV-1
TSV-2
TSVWithNames-1
TSVWithNames-2
TSVWithNamesAndTypes-1
TSVWithNamesAndTypes-2
CSV-1
CSV-2
CSVWithNames-1
CSVWithNames-2
CSVWithNamesAndTypes-1
CSVWithNamesAndTypes-2
JSONCompactEachRow-1
JSONCompactEachRow-2
JSONCompactEachRowWithNames-1
JSONCompactEachRowWithNames-2
JSONCompactEachRowWithNamesAndTypes-1
JSONCompactEachRowWithNamesAndTypes-2
JSONCompactStringsEachRow-1
JSONCompactStringsEachRow-2
JSONCompactStringsEachRowWithNames-1
JSONCompactStringsEachRowWithNames-2
JSONCompactStringsEachRowWithNamesAndTypes-1
JSONCompactStringsEachRowWithNamesAndTypes-2
CustomSeparated-1
CustomSeparated-2
Template-1
Template-2

View File

@ -0,0 +1,61 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
parallel_file=$CUR_DIR/$CLICKHOUSE_TEST_UNIQUE_NAME"_parallel"
non_parallel_file=$CUR_DIR/$CLICKHOUSE_TEST_UNIQUE_NAME"_non_parallel"
formats="RowBinary RowBinaryWithNames RowBinaryWithNamesAndTypes XML Markdown Vertical Values JSONEachRow JSONStringsEachRow TSKV Pretty PrettyNoEscapes JSON JSONStrings JSONCompact JSONCompactStrings PrettyCompact PrettyCompactNoEscapes PrettySpace PrettySpaceNoEscapes TSV TSVWithNames TSVWithNamesAndTypes CSV CSVWithNames CSVWithNamesAndTypes JSONCompactEachRow JSONCompactEachRowWithNames JSONCompactEachRowWithNamesAndTypes JSONCompactStringsEachRow JSONCompactStringsEachRowWithNames JSONCompactStringsEachRowWithNamesAndTypes"
for format in ${formats}; do
echo $format-1
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) format $format" --output_format_parallel_formatting=0 --output_format_pretty_max_rows=1000000 | grep -v "elapsed" > $non_parallel_file
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) format $format" --output_format_parallel_formatting=1 --output_format_pretty_max_rows=1000000 | grep -v "elapsed" > $parallel_file
diff $non_parallel_file $parallel_file
echo $format-2
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format $format" --extremes=1 --output_format_parallel_formatting=0 --output_format_pretty_max_rows=1000000 | grep -v "elapsed" > $non_parallel_file
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format $format" --extremes=1 --output_format_parallel_formatting=1 --output_format_pretty_max_rows=1000000 | grep -v "elapsed" > $parallel_file
diff $non_parallel_file $parallel_file
done
CUSTOM_SETTINGS="SETTINGS format_custom_row_before_delimiter='<row_before_delimiter>', format_custom_row_after_delimiter='<row_after_delimieter>\n', format_custom_row_between_delimiter='<row_between_delimiter>\n', format_custom_result_before_delimiter='<result_before_delimiter>\n', format_custom_result_after_delimiter='<result_after_delimiter>\n', format_custom_field_delimiter='<field_delimiter>', format_custom_escaping_rule='CSV'"
echo "CustomSeparated-1"
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=0 > $non_parallel_file
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=1 > $parallel_file
diff $non_parallel_file $parallel_file
echo "CustomSeparated-2"
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=0 --extremes=1 > $non_parallel_file
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=1 --extremes=1 > $parallel_file
diff $non_parallel_file $parallel_file
echo -ne '{prefix} \n${data}\n $$ suffix $$\n' > "$CUR_DIR"/02122_template_format_resultset.tmp
echo -ne 'x:${x:Quoted}, y:${y:Quoted}, s:${s:Quoted}' > "$CUR_DIR"/02122_template_format_row.tmp
TEMPLATE_SETTINGS="SETTINGS format_template_resultset = '$CUR_DIR/02122_template_format_resultset.tmp', format_template_row = '$CUR_DIR/02122_template_format_row.tmp', format_template_rows_between_delimiter = ';\n'"
echo "Template-1"
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=0 > $non_parallel_file
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=1 > $parallel_file
diff $non_parallel_file $parallel_file
echo -ne '{prefix} \n${data}\n $$ suffix $$\n${totals}\n${min}\n${max}\n${rows:Quoted}\n${rows_before_limit:Quoted}\n${rows_read:Quoted}\n${bytes_read:Quoted}\n' > "$CUR_DIR"/02122_template_format_resultset.tmp
echo "Template-2"
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) group by number with totals limit 190000 format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=0 --extremes=1 > $non_parallel_file
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) group by number with totals limit 190000 format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=1 --extremes=1 > $parallel_file
diff $non_parallel_file $parallel_file
rm $non_parallel_file $parallel_file
rm "$CUR_DIR"/02122_template_format_resultset.tmp "$CUR_DIR"/02122_template_format_row.tmp