Use Port::Data instead of Chunk in LazyOutputFormat.

This commit is contained in:
Nikolai Kochetov 2020-12-23 15:54:20 +03:00
parent 3fd0b69fab
commit 7ab38d5007
5 changed files with 63 additions and 35 deletions

View File

@ -5,6 +5,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_)
: IProcessor({header_, header_, header_}, {}), out(out_)
{
@ -30,7 +35,7 @@ IOutputFormat::Status IOutputFormat::prepare()
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull(true);
current_chunk = input.pullData(true);
current_block_kind = kind;
has_input = true;
return Status::Ready;
@ -44,23 +49,31 @@ IOutputFormat::Status IOutputFormat::prepare()
return Status::Finished;
}
static Chunk prepareTotals(Chunk chunk)
static Port::Data prepareTotals(Port::Data data)
{
if (!chunk.hasRows())
if (data.exception)
return data;
if (!data.chunk.hasRows())
return {};
if (chunk.getNumRows() > 1)
if (data.chunk.getNumRows() > 1)
{
/// This may happen if something like ARRAY JOIN was executed on totals.
/// Skip rows except the first one.
auto columns = chunk.detachColumns();
auto columns = data.chunk.detachColumns();
for (auto & column : columns)
column = column->cut(0, 1);
chunk.setColumns(std::move(columns), 1);
data.chunk.setColumns(std::move(columns), 1);
}
return chunk;
return data;
}
void IOutputFormat::consume(Chunk)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method consume s not implemented for {}", getName());
}
void IOutputFormat::work()
@ -84,17 +97,24 @@ void IOutputFormat::work()
switch (current_block_kind)
{
case Main:
result_rows += current_chunk.getNumRows();
result_bytes += current_chunk.allocatedBytes();
{
result_rows += current_chunk.chunk.getNumRows();
result_bytes += current_chunk.chunk.allocatedBytes();
consume(std::move(current_chunk));
break;
}
case Totals:
if (auto totals = prepareTotals(std::move(current_chunk)))
{
auto totals = prepareTotals(std::move(current_chunk));
if (totals.exception || totals.chunk)
consumeTotals(std::move(totals));
break;
}
case Extremes:
{
consumeExtremes(std::move(current_chunk));
break;
}
}
if (auto_flush)

View File

@ -28,7 +28,7 @@ public:
protected:
WriteBuffer & out;
Chunk current_chunk;
Port::Data current_chunk;
PortKind current_block_kind = PortKind::Main;
bool has_input = false;
bool finished = false;
@ -39,9 +39,14 @@ protected:
RowsBeforeLimitCounterPtr rows_before_limit_counter;
virtual void consume(Chunk) = 0;
virtual void consume(Chunk);
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void consume(Port::Data data) { consume(data.getChunkOrTrow()); }
virtual void consumeTotals(Port::Data data) { consumeTotals(data.getChunkOrTrow()); }
virtual void consumeExtremes(Port::Data data) { consumeExtremes(data.getChunkOrTrow()); }
virtual void finalize() {}
public:
@ -77,8 +82,8 @@ public:
virtual void doWritePrefix() {}
virtual void doWriteSuffix() { finalize(); }
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
void setTotals(const Block & totals) { consumeTotals(Port::Data{.chunk = Chunk(totals.getColumns(), totals.rows())}); }
void setExtremes(const Block & extremes) { consumeExtremes(Port::Data{.chunk = Chunk(extremes.getColumns(), extremes.rows())}); }
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }

View File

@ -15,24 +15,24 @@ Chunk LazyOutputFormat::getChunk(UInt64 milliseconds)
return {};
}
Chunk chunk;
if (!queue.tryPop(chunk, milliseconds))
Port::Data data;
if (!queue.tryPop(data, milliseconds))
return {};
if (chunk)
info.update(chunk.getNumRows(), chunk.allocatedBytes());
if (!data.exception)
info.update(data.chunk.getNumRows(), data.chunk.allocatedBytes());
return chunk;
return data.getChunkOrTrow();
}
Chunk LazyOutputFormat::getTotals()
{
return std::move(totals);
return totals.getChunkOrTrow();
}
Chunk LazyOutputFormat::getExtremes()
{
return std::move(extremes);
return extremes.getChunkOrTrow();
}
void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)

View File

@ -37,28 +37,28 @@ public:
}
protected:
void consume(Chunk chunk) override
void consume(Port::Data data) override
{
if (!finished_processing)
queue.emplace(std::move(chunk));
queue.emplace(std::move(data));
}
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void consumeTotals(Port::Data data) override { totals = std::move(data); }
void consumeExtremes(Port::Data data) override { extremes = std::move(data); }
void finalize() override
{
finished_processing = true;
/// In case we are waiting for result.
queue.emplace(Chunk());
queue.emplace(Port::Data{});
}
private:
ConcurrentBoundedQueue<Chunk> queue;
Chunk totals;
Chunk extremes;
ConcurrentBoundedQueue<Port::Data> queue;
Port::Data totals;
Port::Data extremes;
/// Is not used.
static WriteBuffer out;

View File

@ -60,6 +60,14 @@ protected:
/// Note: std::variant can be used. But move constructor for it can't be inlined.
Chunk chunk;
std::exception_ptr exception;
Chunk getChunkOrTrow()
{
if (exception)
std::rethrow_exception(std::move(exception));
return std::move(chunk);
}
};
private:
@ -303,12 +311,7 @@ public:
Chunk ALWAYS_INLINE pull(bool set_not_needed = false)
{
auto data_ = pullData(set_not_needed);
if (data_.exception)
std::rethrow_exception(data_.exception);
return std::move(data_.chunk);
return pullData(set_not_needed).getChunkOrTrow();
}
bool ALWAYS_INLINE isFinished() const