diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 63c64a84308..c37b41c11ae 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -721,6 +721,7 @@ void executeQuery( }); auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader()); + out->setAutoFlush(); /// Save previous progress callback if any. TODO Do it more conveniently. auto previous_progress_callback = context.getProgressCallback(); diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index ff4ac393471..543a854f75e 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -69,6 +69,9 @@ void IOutputFormat::work() break; } + if (auto_flush) + flush(); + has_input = false; } diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 1137dd78446..71a0d2f0066 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -34,6 +34,9 @@ protected: bool finished = false; bool finalized = false; + /// Flush data on each consumed chunk. This is intented for interactive applications to output data as soon as it's ready. + bool auto_flush = false; + RowsBeforeLimitCounterPtr rows_before_limit_counter; virtual void consume(Chunk) = 0; @@ -50,6 +53,8 @@ public: /// Flush output buffers if any. virtual void flush(); + void setAutoFlush() { auto_flush = true; } + /// Value for rows_before_limit_at_least field. virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}