Revert "Merge pull request #18411 from ClickHouse/try-fix-max_result_rows"

This reverts commit 9a2b163db7, reversing
changes made to 5e97df7c29.
This commit is contained in:
Nikolai Kochetov 2021-01-03 21:51:57 +03:00
parent 768e4871a4
commit 3a1ec56486
7 changed files with 37 additions and 76 deletions

View File

@ -5,11 +5,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_) IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_)
: IProcessor({header_, header_, header_}, {}), out(out_) : IProcessor({header_, header_, header_}, {}), out(out_)
{ {
@ -35,7 +30,7 @@ IOutputFormat::Status IOutputFormat::prepare()
if (!input.hasData()) if (!input.hasData())
return Status::NeedData; return Status::NeedData;
current_chunk = input.pullData(true); current_chunk = input.pull(true);
current_block_kind = kind; current_block_kind = kind;
has_input = true; has_input = true;
return Status::Ready; return Status::Ready;
@ -49,31 +44,23 @@ IOutputFormat::Status IOutputFormat::prepare()
return Status::Finished; return Status::Finished;
} }
static Port::Data prepareTotals(Port::Data data) static Chunk prepareTotals(Chunk chunk)
{ {
if (data.exception) if (!chunk.hasRows())
return data;
if (!data.chunk.hasRows())
return {}; return {};
if (data.chunk.getNumRows() > 1) if (chunk.getNumRows() > 1)
{ {
/// This may happen if something like ARRAY JOIN was executed on totals. /// This may happen if something like ARRAY JOIN was executed on totals.
/// Skip rows except the first one. /// Skip rows except the first one.
auto columns = data.chunk.detachColumns(); auto columns = chunk.detachColumns();
for (auto & column : columns) for (auto & column : columns)
column = column->cut(0, 1); column = column->cut(0, 1);
data.chunk.setColumns(std::move(columns), 1); chunk.setColumns(std::move(columns), 1);
} }
return data; return chunk;
}
void IOutputFormat::consume(Chunk)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method consume s not implemented for {}", getName());
} }
void IOutputFormat::work() void IOutputFormat::work()
@ -97,25 +84,18 @@ void IOutputFormat::work()
switch (current_block_kind) switch (current_block_kind)
{ {
case Main: case Main:
{ result_rows += current_chunk.getNumRows();
result_rows += current_chunk.chunk.getNumRows(); result_bytes += current_chunk.allocatedBytes();
result_bytes += current_chunk.chunk.allocatedBytes();
consume(std::move(current_chunk)); consume(std::move(current_chunk));
break; break;
}
case Totals: case Totals:
{ if (auto totals = prepareTotals(std::move(current_chunk)))
auto totals = prepareTotals(std::move(current_chunk));
if (totals.exception || totals.chunk)
consumeTotals(std::move(totals)); consumeTotals(std::move(totals));
break; break;
}
case Extremes: case Extremes:
{
consumeExtremes(std::move(current_chunk)); consumeExtremes(std::move(current_chunk));
break; break;
} }
}
if (auto_flush) if (auto_flush)
flush(); flush();

View File

@ -28,7 +28,7 @@ public:
protected: protected:
WriteBuffer & out; WriteBuffer & out;
Port::Data current_chunk; Chunk current_chunk;
PortKind current_block_kind = PortKind::Main; PortKind current_block_kind = PortKind::Main;
bool has_input = false; bool has_input = false;
bool finished = false; bool finished = false;
@ -41,14 +41,9 @@ protected:
friend class ParallelFormattingOutputFormat; friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk); virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {} virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {} virtual void consumeExtremes(Chunk) {}
virtual void consume(Port::Data data) { consume(data.getChunkOrTrow()); }
virtual void consumeTotals(Port::Data data) { consumeTotals(data.getChunkOrTrow()); }
virtual void consumeExtremes(Port::Data data) { consumeExtremes(data.getChunkOrTrow()); }
virtual void finalize() {} virtual void finalize() {}
public: public:
@ -84,19 +79,8 @@ public:
virtual void doWritePrefix() {} virtual void doWritePrefix() {}
virtual void doWriteSuffix() { finalize(); } virtual void doWriteSuffix() { finalize(); }
void setTotals(const Block & totals) void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
{ void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
Port::Data data;
data.chunk = Chunk(totals.getColumns(), totals.rows());
consumeTotals(std::move(data));
}
void setExtremes(const Block & extremes)
{
Port::Data data;
data.chunk = Chunk(extremes.getColumns(), extremes.rows());
consumeExtremes(std::move(data));
}
size_t getResultRows() const { return result_rows; } size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; } size_t getResultBytes() const { return result_bytes; }

View File

@ -15,24 +15,24 @@ Chunk LazyOutputFormat::getChunk(UInt64 milliseconds)
return {}; return {};
} }
Port::Data data; Chunk chunk;
if (!queue.tryPop(data, milliseconds)) if (!queue.tryPop(chunk, milliseconds))
return {}; return {};
if (!data.exception) if (chunk)
info.update(data.chunk.getNumRows(), data.chunk.allocatedBytes()); info.update(chunk.getNumRows(), chunk.allocatedBytes());
return data.getChunkOrTrow(); return chunk;
} }
Chunk LazyOutputFormat::getTotals() Chunk LazyOutputFormat::getTotals()
{ {
return totals.getChunkOrTrow(); return std::move(totals);
} }
Chunk LazyOutputFormat::getExtremes() Chunk LazyOutputFormat::getExtremes()
{ {
return extremes.getChunkOrTrow(); return std::move(extremes);
} }
void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit) void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)

View File

@ -37,28 +37,28 @@ public:
} }
protected: protected:
void consume(Port::Data data) override void consume(Chunk chunk) override
{ {
if (!finished_processing) if (!finished_processing)
queue.emplace(std::move(data)); queue.emplace(std::move(chunk));
} }
void consumeTotals(Port::Data data) override { totals = std::move(data); } void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Port::Data data) override { extremes = std::move(data); } void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void finalize() override void finalize() override
{ {
finished_processing = true; finished_processing = true;
/// In case we are waiting for result. /// In case we are waiting for result.
queue.emplace(Port::Data{}); queue.emplace(Chunk());
} }
private: private:
ConcurrentBoundedQueue<Port::Data> queue; ConcurrentBoundedQueue<Chunk> queue;
Port::Data totals; Chunk totals;
Port::Data extremes; Chunk extremes;
/// Is not used. /// Is not used.
static WriteBuffer out; static WriteBuffer out;

View File

@ -60,14 +60,6 @@ protected:
/// Note: std::variant can be used. But move constructor for it can't be inlined. /// Note: std::variant can be used. But move constructor for it can't be inlined.
Chunk chunk; Chunk chunk;
std::exception_ptr exception; std::exception_ptr exception;
Chunk getChunkOrTrow()
{
if (exception)
std::rethrow_exception(std::move(exception));
return std::move(chunk);
}
}; };
private: private:
@ -311,7 +303,12 @@ public:
Chunk ALWAYS_INLINE pull(bool set_not_needed = false) Chunk ALWAYS_INLINE pull(bool set_not_needed = false)
{ {
return pullData(set_not_needed).getChunkOrTrow(); auto data_ = pullData(set_not_needed);
if (data_.exception)
std::rethrow_exception(data_.exception);
return std::move(data_.chunk);
} }
bool ALWAYS_INLINE isFinished() const bool ALWAYS_INLINE isFinished() const

View File

@ -12,7 +12,7 @@ You must install latest Docker from
https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#set-up-the-repository https://docs.docker.com/engine/installation/linux/docker-ce/ubuntu/#set-up-the-repository
Don't use Docker from your system repository. Don't use Docker from your system repository.
* [pip](https://pypi.python.org/pypi/pip) and `libpq-dev`. To install: `sudo apt-get install python3-pip libpq-dev zlib1g-dev libcrypto++-dev libssl-dev libkrb5-dev` * [pip](https://pypi.python.org/pypi/pip) and `libpq-dev`. To install: `sudo apt-get install python3-pip libpq-dev zlib1g-dev libcrypto++-dev libssl-dev`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest` * [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: * [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install:

View File

@ -239,7 +239,7 @@ def test_progress():
, output: "6\\t0\\n7\\t0\\n" , output: "6\\t0\\n7\\t0\\n"
, stats { , stats {
rows: 8 rows: 8
blocks: 5 blocks: 4
allocated_bytes: 324 allocated_bytes: 324
applied_limit: true applied_limit: true
rows_before_limit: 8 rows_before_limit: 8