Fix quirks for PrettyCompactMonoBlock

- Fix PrettyCompactMonoBlock for clickhouse-local (broken in 20.3+,
  fails with an error `Unknown format PrettyCompactMonoBlock`, after #6239)
- Fix extremes/totals with PrettyCompactMonoBlock (even before 20.3
  breakage they were simply ignored)
This commit is contained in:
Azat Khuzhin 2020-08-06 01:43:23 +03:00
parent 3aba406f47
commit 15be6a0dd5
5 changed files with 71 additions and 40 deletions

View File

@ -10,7 +10,6 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
@ -203,19 +202,6 @@ BlockInputStreamPtr FormatFactory::getInput(
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{
if (name == "PrettyCompactMonoBlock")
{
/// TODO: rewrite
auto format = getOutputFormat("PrettyCompact", buf, sample, context);
auto res = std::make_shared<SquashingBlockOutputStream>(
std::make_shared<OutputStreamToOutputFormat>(format),
sample, context.getSettingsRef().output_format_pretty_max_rows, 0);
res->disableFlush();
return std::make_shared<MaterializingBlockOutputStream>(res, sample);
}
if (!getCreators(name).output_processor_creator)
{
const auto & output_getter = getCreators(name).output_creator;

View File

@ -134,7 +134,6 @@ public:
}
private:
/// FormatsDictionary dict;
FormatsDictionary dict;
FormatFactory();

View File

@ -42,7 +42,7 @@ protected:
virtual void writeSuffix();
void writeSuffixIfNot()
virtual void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();

View File

@ -1,7 +1,6 @@
#include <Common/PODArray.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
///#include <DataStreams/SquashingBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h>
@ -48,6 +47,21 @@ GridSymbols ascii_grid_symbols {
}
PrettyCompactBlockOutputFormat::PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_)
: PrettyBlockOutputFormat(out_, header, format_settings_)
, mono_block(mono_block_)
{
}
void PrettyCompactBlockOutputFormat::writeSuffixIfNot()
{
if (mono_chunk)
{
writeChunk(mono_chunk, PortKind::Main);
mono_chunk.clear();
}
}
void PrettyCompactBlockOutputFormat::writeHeader(
const Block & block,
const Widths & max_widths,
@ -159,6 +173,39 @@ void PrettyCompactBlockOutputFormat::write(const Chunk & chunk, PortKind port_ki
total_rows += chunk.getNumRows();
return;
}
if (mono_block)
{
if (port_kind == PortKind::Main)
{
if (!mono_chunk)
{
mono_chunk = chunk.clone();
return;
}
MutableColumns mutation = mono_chunk.mutateColumns();
for (size_t position = 0; position < mutation.size(); ++position)
{
auto column = chunk.getColumns()[position];
mutation[position]->insertRangeFrom(*column, 0, column->size());
}
size_t rows = mutation[0]->size();
mono_chunk.setColumns(std::move(mutation), rows);
return;
}
else
{
/// Should be written from writeSuffixIfNot()
assert(!mono_chunk);
}
}
writeChunk(chunk, port_kind);
}
void PrettyCompactBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port_kind)
{
UInt64 max_rows = format_settings.pretty.max_rows;
size_t num_rows = chunk.getNumRows();
const auto & header = getPort(port_kind).getHeader();
@ -182,14 +229,17 @@ void PrettyCompactBlockOutputFormat::write(const Chunk & chunk, PortKind port_ki
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("PrettyCompact", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
for (const auto & [name, mono_block] : {std::make_pair("PrettyCompact", false), std::make_pair("PrettyCompactMonoBlock", true)})
{
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings);
});
factory.registerOutputFormatProcessor(name, [mono_block = mono_block](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings, mono_block);
});
}
factory.registerOutputFormatProcessor("PrettyCompactNoEscapes", [](
WriteBuffer & buf,
@ -199,20 +249,8 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
{
FormatSettings changed_settings = format_settings;
changed_settings.pretty.color = false;
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, changed_settings);
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, changed_settings, false /* mono_block */);
});
/// TODO
// factory.registerOutputFormat("PrettyCompactMonoBlock", [](
// WriteBuffer & buf,
// const Block & sample,
// const FormatSettings & format_settings)
// {
// BlockOutputStreamPtr impl = std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings);
// auto res = std::make_shared<SquashingBlockOutputStream>(impl, impl->getHeader(), format_settings.pretty.max_rows, 0);
// res->disableFlush();
// return res;
// });
}
}

View File

@ -1,6 +1,8 @@
#pragma once
#include <Processors/Formats/Impl/PrettyBlockOutputFormat.h>
#include <optional>
#include <unordered_map>
namespace DB
@ -11,9 +13,7 @@ namespace DB
class PrettyCompactBlockOutputFormat : public PrettyBlockOutputFormat
{
public:
PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_)
: PrettyBlockOutputFormat(out_, header, format_settings_) {}
PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_);
String getName() const override { return "PrettyCompactBlockOutputFormat"; }
protected:
@ -26,6 +26,14 @@ protected:
const Columns & columns,
const WidthsPerColumn & widths,
const Widths & max_widths);
private:
bool mono_block;
/// For mono_block == true only
Chunk mono_chunk;
void writeChunk(const Chunk & chunk, PortKind port_kind);
void writeSuffixIfNot() override;
};
}