2018-05-24 01:02:16 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <string>
|
|
|
|
#include <Processors/IProcessor.h>
|
2020-03-19 11:45:52 +00:00
|
|
|
#include <Processors/RowsBeforeLimitCounter.h>
|
2019-02-19 18:41:18 +00:00
|
|
|
#include <IO/Progress.h>
|
2021-11-17 20:51:46 +00:00
|
|
|
#include <Common/Stopwatch.h>
|
2018-05-24 01:02:16 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
class WriteBuffer;
|
|
|
|
|
|
|
|
/** Output format have three inputs and no outputs. It writes data from WriteBuffer.
|
|
|
|
*
|
|
|
|
* First input is for main resultset, second is for "totals" and third is for "extremes".
|
|
|
|
* It's not necessarily to connect "totals" or "extremes" ports (they may remain dangling).
|
|
|
|
*
|
|
|
|
* Data from input ports are pulled in order: first, from main input, then totals, then extremes.
|
|
|
|
*
|
|
|
|
* By default, data for "totals" and "extremes" is ignored.
|
|
|
|
*/
|
|
|
|
class IOutputFormat : public IProcessor
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
|
|
|
|
|
2019-08-03 11:02:40 +00:00
|
|
|
IOutputFormat(const Block & header_, WriteBuffer & out_);
|
2018-05-24 01:02:16 +00:00
|
|
|
|
|
|
|
Status prepare() override;
|
|
|
|
void work() override;
|
|
|
|
|
2019-02-19 18:41:18 +00:00
|
|
|
/// Flush output buffers if any.
|
|
|
|
virtual void flush();
|
2018-05-24 01:02:16 +00:00
|
|
|
|
2020-04-19 19:02:36 +00:00
|
|
|
void setAutoFlush() { auto_flush = true; }
|
|
|
|
|
2019-02-19 18:41:18 +00:00
|
|
|
/// Value for rows_before_limit_at_least field.
|
|
|
|
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
|
|
|
|
|
2020-03-19 11:45:52 +00:00
|
|
|
/// Counter to calculate rows_before_limit_at_least in processors pipeline.
|
|
|
|
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_counter.swap(counter); }
|
|
|
|
|
2019-02-19 18:41:18 +00:00
|
|
|
/// Notify about progress. Method could be called from different threads.
|
|
|
|
/// Passed value are delta, that must be summarized.
|
|
|
|
virtual void onProgress(const Progress & /*progress*/) {}
|
2018-05-24 01:02:16 +00:00
|
|
|
|
2019-02-19 18:41:18 +00:00
|
|
|
/// Content-Type to set when sending HTTP response.
|
|
|
|
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
|
2018-05-24 01:02:16 +00:00
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
InputPort & getPort(PortKind kind) { return *std::next(inputs.begin(), kind); }
|
2019-07-24 18:00:09 +00:00
|
|
|
|
2021-10-19 09:58:10 +00:00
|
|
|
/// Compatibility with old interface.
|
|
|
|
/// TODO: separate formats and processors.
|
2019-07-24 18:00:09 +00:00
|
|
|
|
2020-04-27 18:15:55 +00:00
|
|
|
void write(const Block & block);
|
2019-07-24 18:00:09 +00:00
|
|
|
|
2021-11-11 18:09:21 +00:00
|
|
|
void finalize();
|
2019-07-24 18:00:09 +00:00
|
|
|
|
2021-09-15 19:35:48 +00:00
|
|
|
virtual bool expectMaterializedColumns() const { return true; }
|
|
|
|
|
2021-11-17 20:51:46 +00:00
|
|
|
void setTotals(const Block & totals)
|
|
|
|
{
|
|
|
|
writeSuffixIfNot();
|
|
|
|
consumeTotals(Chunk(totals.getColumns(), totals.rows()));
|
|
|
|
are_totals_written = true;
|
|
|
|
}
|
|
|
|
void setExtremes(const Block & extremes)
|
|
|
|
{
|
|
|
|
writeSuffixIfNot();
|
|
|
|
consumeExtremes(Chunk(extremes.getColumns(), extremes.rows()));
|
|
|
|
}
|
2020-07-02 14:51:10 +00:00
|
|
|
|
|
|
|
size_t getResultRows() const { return result_rows; }
|
2020-07-02 15:00:37 +00:00
|
|
|
size_t getResultBytes() const { return result_bytes; }
|
2020-07-02 14:51:10 +00:00
|
|
|
|
2021-11-02 13:40:41 +00:00
|
|
|
void doNotWritePrefix() { need_write_prefix = false; }
|
|
|
|
|
|
|
|
protected:
|
|
|
|
friend class ParallelFormattingOutputFormat;
|
|
|
|
|
|
|
|
virtual void consume(Chunk) = 0;
|
|
|
|
virtual void consumeTotals(Chunk) {}
|
|
|
|
virtual void consumeExtremes(Chunk) {}
|
2021-11-11 18:09:21 +00:00
|
|
|
virtual void finalizeImpl() {}
|
2021-11-02 13:40:41 +00:00
|
|
|
virtual void writePrefix() {}
|
2021-11-17 20:51:46 +00:00
|
|
|
virtual void writeSuffix() {}
|
|
|
|
|
2021-11-02 13:40:41 +00:00
|
|
|
void writePrefixIfNot()
|
|
|
|
{
|
|
|
|
if (need_write_prefix)
|
|
|
|
{
|
|
|
|
writePrefix();
|
|
|
|
need_write_prefix = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-17 20:51:46 +00:00
|
|
|
void writeSuffixIfNot()
|
|
|
|
{
|
|
|
|
if (need_write_suffix)
|
|
|
|
{
|
|
|
|
writeSuffix();
|
|
|
|
need_write_suffix = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-19 13:45:10 +00:00
|
|
|
/// Methods-helpers for parallel formatting.
|
|
|
|
|
|
|
|
/// Set the real number of first row in the first chunk of data.
|
|
|
|
void setFirstRowNumber(size_t first_row_number_)
|
|
|
|
{
|
|
|
|
first_row_number = first_row_number_;
|
|
|
|
onFirstRowNumberUpdate();
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t getFirstRowNumber() const { return first_row_number; }
|
|
|
|
|
|
|
|
/// Update state according to new first_row_number.
|
|
|
|
virtual void onFirstRowNumberUpdate() {}
|
|
|
|
|
|
|
|
/// Some formats outputs some statistics after the data,
|
|
|
|
/// in parallel formatting we collect these statistics outside the
|
|
|
|
/// underling format and then set it to format before finalizing.
|
|
|
|
struct Statistics
|
|
|
|
{
|
|
|
|
Stopwatch watch;
|
|
|
|
Progress progress;
|
|
|
|
bool applied_limit = false;
|
|
|
|
size_t rows_before_limit = 0;
|
|
|
|
};
|
|
|
|
|
|
|
|
void setOutsideStatistics(Statistics statistics) { outside_statistics = std::make_shared<Statistics>(std::move(statistics)); }
|
|
|
|
std::shared_ptr<Statistics> getOutsideStatistics() const { return outside_statistics; }
|
|
|
|
|
|
|
|
/// In some formats the way we print extremes depends on
|
|
|
|
/// were totals printed or not. In this case in parallel formatting
|
|
|
|
/// we should notify underling format if totals were printed.
|
|
|
|
void setTotalsAreWritten() { are_totals_written = true; }
|
|
|
|
bool areTotalsWritten() const { return are_totals_written; }
|
|
|
|
|
2021-11-02 13:40:41 +00:00
|
|
|
WriteBuffer & out;
|
|
|
|
|
|
|
|
Chunk current_chunk;
|
|
|
|
PortKind current_block_kind = PortKind::Main;
|
|
|
|
bool has_input = false;
|
|
|
|
bool finished = false;
|
|
|
|
bool finalized = false;
|
|
|
|
|
|
|
|
/// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready.
|
|
|
|
bool auto_flush = false;
|
2021-11-17 20:51:46 +00:00
|
|
|
|
2021-11-02 13:40:41 +00:00
|
|
|
bool need_write_prefix = true;
|
2021-11-17 20:51:46 +00:00
|
|
|
bool need_write_suffix = true;
|
2021-11-02 13:40:41 +00:00
|
|
|
|
|
|
|
RowsBeforeLimitCounterPtr rows_before_limit_counter;
|
|
|
|
|
2020-07-02 14:51:10 +00:00
|
|
|
private:
|
2021-11-17 20:51:46 +00:00
|
|
|
size_t first_row_number = 0;
|
2021-11-19 13:45:10 +00:00
|
|
|
std::shared_ptr<Statistics> outside_statistics = nullptr;
|
2021-11-17 20:51:46 +00:00
|
|
|
bool are_totals_written = false;
|
|
|
|
|
2020-07-02 14:51:10 +00:00
|
|
|
/// Counters for consumed chunks. Are used for QueryLog.
|
|
|
|
size_t result_rows = 0;
|
|
|
|
size_t result_bytes = 0;
|
2018-05-24 01:02:16 +00:00
|
|
|
};
|
|
|
|
}
|