Refactor progress bar, adjust progress indication for reading from file

This commit is contained in:
kssenii 2021-05-13 22:56:42 +00:00
parent e82d717233
commit c25be65ebf
11 changed files with 304 additions and 269 deletions

View File

@ -30,7 +30,6 @@
#include <common/find_symbols.h>
#include <common/LineReader.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ShellCommand.h>
#include <Common/UnicodeBar.h>
@ -86,7 +85,7 @@
#include <common/argsToConfig.h>
#include <Common/TerminalSize.h>
#include <Common/UTF8Helpers.h>
#include <Common/ProgressBar.h>
#include <Common/ProgressIndication.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
@ -228,13 +227,13 @@ private:
String server_version;
String server_display_name;
Stopwatch watch;
/// true by default - for interactive mode, might be changed when --progress option is checked for
/// non-interactive mode.
bool need_render_progress = true;
/// The server periodically sends information about how much data was read since last time.
Progress progress;
bool written_first_block = false;
/// Progress bar
ProgressBar progress_bar;
ProgressIndication progress_indication;
/// External tables info.
std::list<ExternalTable> external_tables;
@ -534,7 +533,7 @@ private:
if (!is_interactive)
{
progress_bar.need_render_progress = config().getBool("progress", false);
need_render_progress = config().getBool("progress", false);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);
}
@ -1556,12 +1555,9 @@ private:
}
}
watch.restart();
processed_rows = 0;
progress.reset();
progress_bar.show_progress_bar = false;
progress_bar.written_progress_chars = 0;
progress_bar.written_first_block = false;
written_first_block = false;
progress_indication.resetProgress();
{
/// Temporarily apply query settings to context.
@ -1628,16 +1624,15 @@ private:
if (is_interactive)
{
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << watch.elapsedSeconds() << " sec. ";
if (progress.read_rows >= 1000)
writeFinalProgress();
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
/// Write final progress if it makes sence to do so.
writeFinalProgress();
std::cout << std::endl << std::endl;
}
else if (print_time_to_stderr)
{
std::cerr << watch.elapsedSeconds() << "\n";
std::cerr << progress_indication.elapsedSeconds() << "\n";
}
}
@ -1813,7 +1808,17 @@ private:
try
{
if (need_render_progress)
{
/// Set total_bytes_to_read for current fd.
FileProgress file_progress(0, std_in.size());
progress_indication.updateProgress(Progress(file_progress));
/// Set callback to be called on file progress.
progress_indication.setFileProgressCallback(context, true);
/// Add callback to track reading from fd.
std_in.setProgressCallback(context);
}
sendDataFrom(std_in, sample, columns_description);
}
@ -1937,7 +1942,7 @@ private:
cancelled = true;
if (is_interactive)
{
progress_bar.clearProgress();
progress_indication.clearProgressOutput();
std::cout << "Cancelling query." << std::endl;
}
@ -2164,7 +2169,7 @@ private:
current_format = "Vertical";
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
if (!progress_bar.need_render_progress)
if (!need_render_progress)
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
else
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
@ -2223,25 +2228,25 @@ private:
if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
return;
if (progress_bar.need_render_progress)
progress_bar.clearProgress();
if (need_render_progress)
progress_indication.clearProgressOutput();
block_out_stream->write(block);
progress_bar.written_first_block = true;
written_first_block = true;
/// Received data block is immediately displayed to the user.
block_out_stream->flush();
/// Restore progress bar after data block.
if (progress_bar.need_render_progress)
progress_bar.writeProgress(progress, watch.elapsed());
if (need_render_progress)
progress_indication.writeProgress();
}
void onLogData(Block & block)
{
initLogsOutputStream();
progress_bar.clearProgress();
progress_indication.clearProgressOutput();
logs_out_stream->write(block);
logs_out_stream->flush();
}
@ -2262,28 +2267,23 @@ private:
void onProgress(const Progress & value)
{
if (!progress_bar.updateProgress(progress, value))
if (!progress_indication.updateProgress(value))
{
// Just a keep-alive update.
return;
}
if (block_out_stream)
block_out_stream->onProgress(value);
progress_bar.writeProgress(progress, watch.elapsed());
if (need_render_progress)
progress_indication.writeProgress();
}
void writeFinalProgress()
{
std::cout << "Processed " << formatReadableQuantity(progress.read_rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
size_t elapsed_ns = watch.elapsed();
if (elapsed_ns)
std::cout << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
else
std::cout << ". ";
progress_indication.writeFinalProgress();
}
@ -2304,7 +2304,7 @@ private:
void onEndOfStream()
{
progress_bar.clearProgress();
progress_indication.clearProgressOutput();
if (block_out_stream)
block_out_stream->writeSuffix();
@ -2314,9 +2314,9 @@ private:
resetOutput();
if (is_interactive && !progress_bar.written_first_block)
if (is_interactive && !written_first_block)
{
progress_bar.clearProgress();
progress_indication.clearProgressOutput();
std::cout << "Ok." << std::endl;
}
}

View File

@ -389,34 +389,29 @@ void LocalServer::processQueries()
CurrentThread::QueryScope query_scope_holder(context);
///Set progress show
progress_bar.need_render_progress = config().getBool("progress", false);
need_render_progress = config().getBool("progress", false);
if (progress_bar.need_render_progress)
if (need_render_progress)
{
context->setProgressCallback([&](const Progress & value)
{
if (!progress_bar.updateProgress(progress, value))
{
// Just a keep-alive update.
return;
}
progress_bar.writeProgress(progress, watch.elapsed());
});
{
/// Write progress only if progress was updated
if (progress_indication.updateProgress(value))
progress_indication.writeProgress();
});
}
bool echo_queries = config().hasOption("echo") || config().hasOption("verbose");
if (config().hasOption("progress"))
context->setRenderProgress();
if (need_render_progress)
progress_indication.setFileProgressCallback(context);
std::exception_ptr exception;
for (const auto & query : queries)
{
watch.restart();
progress.reset();
progress_bar.show_progress_bar = false;
progress_bar.written_progress_chars = 0;
progress_bar.written_first_block = false;
written_first_block = false;
progress_indication.resetProgress();
ReadBufferFromString read_buf(query);
WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
@ -566,7 +561,6 @@ void LocalServer::init(int argc, char ** argv)
("output-format", po::value<std::string>(), "default output format")
("stacktrace", "print stack traces of exceptions")
("progress", "show progress for File table engine")
("echo", "print query before execution")
("verbose", "print query and other debugging info")
("logger.console", po::value<bool>()->implicit_value(true), "Log to console")

View File

@ -7,7 +7,7 @@
#include <Interpreters/Context.h>
#include <loggers/Loggers.h>
#include <Poco/Util/Application.h>
#include <Common/ProgressBar.h>
#include <Common/ProgressIndication.h>
namespace DB
{
@ -49,9 +49,12 @@ protected:
/// Settings specified via command line args
Settings cmd_settings;
ProgressBar progress_bar;
Progress progress;
Stopwatch watch;
bool need_render_progress = false;
bool written_first_block = false;
ProgressIndication progress_indication;
std::optional<std::filesystem::path> temporary_directory_to_delete;
};

View File

@ -1,32 +0,0 @@
#pragma once
#include <Common/Stopwatch.h>
#include <IO/Progress.h>
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define CLEAR_TO_END_OF_LINE "\033[K"
namespace DB
{
struct ProgressBar
{
public:
static bool updateProgress(Progress & progress, const Progress & value);
void writeProgress(const Progress & progress, const size_t elapsed_ns);
void clearProgress();
/// For interactive mode always show progress bar, for non-interactive mode it is accessed from config().
bool need_render_progress = true;
bool show_progress_bar = false;
size_t written_progress_chars = 0;
bool written_first_block = false;
bool clear_progress = false;
};
}

View File

@ -1,22 +1,64 @@
#include "ProgressBar.h"
#include "ProgressIndication.h"
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Common/TerminalSize.h>
#include <Common/UnicodeBar.h>
#include <Databases/DatabaseMemory.h>
namespace DB
{
bool ProgressBar::updateProgress(Progress & progress, const Progress & value)
bool ProgressIndication::updateProgress(const Progress & value)
{
return progress.incrementPiecewiseAtomically(value);
}
void ProgressBar::writeProgress(const Progress & progress, const size_t elapsed_ns)
void ProgressIndication::clearProgressOutput()
{
if (!need_render_progress)
if (written_progress_chars)
{
written_progress_chars = 0;
std::cerr << "\r" CLEAR_TO_END_OF_LINE;
}
}
void ProgressIndication::resetProgress()
{
watch.restart();
progress.reset();
show_progress_bar = false;
written_progress_chars = 0;
}
void ProgressIndication::setFileProgressCallback(ContextPtr context, bool write_progress_on_update)
{
context->setFileProgressCallback([&](const FileProgress & file_progress)
{
progress.incrementPiecewiseAtomically(Progress(file_progress));
if (write_progress_on_update)
writeProgress();
});
}
void ProgressIndication::writeFinalProgress()
{
if (progress.read_rows < 1000)
return;
std::cout << "Processed " << formatReadableQuantity(progress.read_rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
auto elapsed_ns = watch.elapsed();
if (elapsed_ns)
std::cout << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
else
std::cout << ". ";
}
void ProgressIndication::writeProgress()
{
/// Output all progress bar commands to stderr at once to avoid flicker.
WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);
@ -45,26 +87,37 @@ void ProgressBar::writeProgress(const Progress & progress, const size_t elapsed_
message << '\r';
size_t prefix_size = message.count();
size_t read_bytes = progress.read_raw_bytes ? progress.read_raw_bytes : progress.read_bytes;
message << indicator << " Progress: ";
message
<< formatReadableQuantity(progress.read_rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
<< formatReadableSizeWithDecimalSuffix(read_bytes);
auto elapsed_ns = watch.elapsed();
if (elapsed_ns)
message << " ("
<< formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
<< formatReadableSizeWithDecimalSuffix(read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
else
message << ". ";
written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 2); /// Don't count invisible output (escape sequences).
/// If the approximate number of rows to process is known, we can display a progress bar and percentage.
if (progress.total_rows_to_read > 0)
if (progress.total_rows_to_read || progress.total_raw_bytes_to_read)
{
size_t total_rows_corrected = std::max(progress.read_rows, progress.total_rows_to_read);
size_t current_count, max_count;
if (progress.total_rows_to_read)
{
current_count = progress.read_rows;
max_count = std::max(progress.read_rows, progress.total_rows_to_read);
}
else
{
current_count = progress.read_raw_bytes;
max_count = std::max(progress.read_raw_bytes, progress.total_raw_bytes_to_read);
}
/// To avoid flicker, display progress bar only if .5 seconds have passed since query execution start
/// and the query is less than halfway done.
@ -72,7 +125,7 @@ void ProgressBar::writeProgress(const Progress & progress, const size_t elapsed_
if (elapsed_ns > 500000000)
{
/// Trigger to start displaying progress bar. If query is mostly done, don't display it.
if (progress.read_rows * 2 < total_rows_corrected)
if (current_count * 2 < max_count)
show_progress_bar = true;
if (show_progress_bar)
@ -81,7 +134,7 @@ void ProgressBar::writeProgress(const Progress & progress, const size_t elapsed_
if (width_of_progress_bar > 0)
{
std::string bar
= UnicodeBar::render(UnicodeBar::getWidth(progress.read_rows, 0, total_rows_corrected, width_of_progress_bar));
= UnicodeBar::render(UnicodeBar::getWidth(current_count, 0, max_count, width_of_progress_bar));
message << "\033[0;32m" << bar << "\033[0m";
if (width_of_progress_bar > static_cast<ssize_t>(bar.size() / UNICODE_BAR_CHAR_SIZE))
message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' ');
@ -90,7 +143,7 @@ void ProgressBar::writeProgress(const Progress & progress, const size_t elapsed_
}
/// Underestimate percentage a bit to avoid displaying 100%.
message << ' ' << (99 * progress.read_rows / total_rows_corrected) << '%';
message << ' ' << (99 * current_count / max_count) << '%';
}
message << CLEAR_TO_END_OF_LINE;
@ -99,13 +152,4 @@ void ProgressBar::writeProgress(const Progress & progress, const size_t elapsed_
message.next();
}
void ProgressBar::clearProgress()
{
if (written_progress_chars)
{
written_progress_chars = 0;
std::cerr << "\r" CLEAR_TO_END_OF_LINE;
}
}
}

View File

@ -0,0 +1,61 @@
#pragma once
#include <Common/Stopwatch.h>
#include <IO/Progress.h>
#include <Interpreters/Context.h>
#include <Common/Stopwatch.h>
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define CLEAR_TO_END_OF_LINE "\033[K"
namespace DB
{
class ProgressIndication
{
public:
/// Write progress to stderr.
void writeProgress();
void writeFinalProgress();
/// Clear stderr output.
void clearProgressOutput();
/// Reset progress values.
void resetProgress();
/// Update Progress object. It can be updated from:
/// 1. onProgress in clickhouse-client;
/// 2. ProgressCallback via setProgressCallback methrod in:
/// - context (used in clickhouse-local, can also be added in arbitrary place)
/// - SourceWithProgress (also in streams)
/// - readBufferFromFileDescriptor (for file processing progress)
bool updateProgress(const Progress & value);
/// In some cases there is a need to update progress value, when there is no access to progress_inidcation object.
/// In this case it is added via context.
/// `write_progress_on_update` is needed to write progress for loading files data via pipe in non-interactive mode.
void setFileProgressCallback(ContextPtr context, bool write_progress_on_update = false);
/// How much seconds passed since query execution start.
UInt64 elapsedSeconds() const { return watch.elapsedSeconds(); }
private:
/// Show progress bar only if on first writeProgress call the query is not yet half done.
bool show_progress_bar = false;
/// Width of how much has been printed currently into stderr. Used to define size of progress bar and
/// to check whether progress output needs to be cleared.
size_t written_progress_chars = 0;
/// The server periodically sends information about how much data was read since last time.
/// This information is stored here.
Progress progress;
/// Track query execution time.
Stopwatch watch;
};
}

View File

@ -63,6 +63,83 @@ void ProgressValues::writeJSON(WriteBuffer & out) const
writeCString("\"}", out);
}
bool Progress::incrementPiecewiseAtomically(const Progress & rhs)
{
read_rows += rhs.read_rows;
read_bytes += rhs.read_bytes;
read_raw_bytes += rhs.read_raw_bytes;
total_rows_to_read += rhs.total_rows_to_read;
total_raw_bytes_to_read += rhs.total_raw_bytes_to_read;
written_rows += rhs.written_rows;
written_bytes += rhs.written_bytes;
return rhs.read_rows || rhs.written_rows;
}
void Progress::reset()
{
read_rows = 0;
read_bytes = 0;
read_raw_bytes = 0;
total_rows_to_read = 0;
total_raw_bytes_to_read = 0;
written_rows = 0;
written_bytes = 0;
}
ProgressValues Progress::getValues() const
{
ProgressValues res;
res.read_rows = read_rows.load(std::memory_order_relaxed);
res.read_bytes = read_bytes.load(std::memory_order_relaxed);
res.read_raw_bytes = read_raw_bytes.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed);
res.total_raw_bytes_to_read = total_raw_bytes_to_read.load(std::memory_order_relaxed);
res.written_rows = written_rows.load(std::memory_order_relaxed);
res.written_bytes = written_bytes.load(std::memory_order_relaxed);
return res;
}
ProgressValues Progress::fetchAndResetPiecewiseAtomically()
{
ProgressValues res;
res.read_rows = read_rows.fetch_and(0);
res.read_bytes = read_bytes.fetch_and(0);
res.read_raw_bytes = read_raw_bytes.fetch_and(0);
res.total_rows_to_read = total_rows_to_read.fetch_and(0);
res.total_raw_bytes_to_read = total_raw_bytes_to_read.fetch_and(0);
res.written_rows = written_rows.fetch_and(0);
res.written_bytes = written_bytes.fetch_and(0);
return res;
}
Progress & Progress::operator=(Progress && other)
{
read_rows = other.read_rows.load(std::memory_order_relaxed);
read_bytes = other.read_bytes.load(std::memory_order_relaxed);
read_raw_bytes = other.read_raw_bytes.load(std::memory_order_relaxed);
total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
total_raw_bytes_to_read = other.total_raw_bytes_to_read.load(std::memory_order_relaxed);
written_rows = other.written_rows.load(std::memory_order_relaxed);
written_bytes = other.written_bytes.load(std::memory_order_relaxed);
return *this;
}
void Progress::read(ReadBuffer & in, UInt64 server_revision)
{
ProgressValues values;

View File

@ -19,7 +19,11 @@ struct ProgressValues
{
size_t read_rows;
size_t read_bytes;
size_t read_raw_bytes;
size_t total_rows_to_read;
size_t total_raw_bytes_to_read;
size_t written_rows;
size_t written_bytes;
@ -47,32 +51,13 @@ struct WriteProgress
: written_rows(written_rows_), written_bytes(written_bytes_) {}
};
/// Track progress of processing file base buffer.
/// Used to display progress of loading data from stdin, for file table engine, etc.
struct FileProgress
{
/// Track elapsed time.
Stopwatch watch;
size_t total_bytes_to_process;
/// Here read_bytes (raw bytes) - do not equal ReadProgress::read_bytes, which are calculated according to column types.
size_t read_bytes;
size_t total_bytes_to_read;
/// FileProgress lies in Context and accessed via const reference.
/// These fields are allowed to be updated in a progress callback.
mutable std::atomic<uint64_t> processed_bytes;
FileProgress() : total_bytes_to_process(0), processed_bytes(0) {}
FileProgress(const FileProgress & other)
: watch(other.watch)
, total_bytes_to_process(other.total_bytes_to_process)
, processed_bytes(other.processed_bytes.load()) {}
FileProgress & operator=(FileProgress other)
{
watch = other.watch;
total_bytes_to_process = other.total_bytes_to_process;
processed_bytes = other.processed_bytes.load();
return *this;
}
FileProgress(size_t read_bytes_, size_t total_bytes_to_read_ = 0) : read_bytes(read_bytes_), total_bytes_to_read(total_bytes_to_read_) {}
};
@ -84,87 +69,50 @@ struct Progress
{
std::atomic<size_t> read_rows {0}; /// Rows (source) processed.
std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed.
std::atomic<size_t> read_raw_bytes {0}; /// Raw bytes processed.
/** How much rows must be processed, in total, approximately. Non-zero value is sent when there is information about some new part of job.
* Received values must be summed to get estimate of total rows to process.
/** How much rows/bytes must be processed, in total, approximately. Non-zero value is sent when there is information about
* some new part of job. Received values must be summed to get estimate of total rows to process.
* `total_raw_bytes_to_process` is used for file table engine or when reading from file descriptor.
* Used for rendering progress bar on client.
*/
std::atomic<size_t> total_rows_to_read {0};
std::atomic<size_t> total_raw_bytes_to_read {0};
std::atomic<size_t> written_rows {0};
std::atomic<size_t> written_bytes {0};
Progress() = default;
Progress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
explicit Progress(ReadProgress read_progress)
: read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {}
explicit Progress(WriteProgress write_progress)
: written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {}
explicit Progress(FileProgress file_progress)
: read_raw_bytes(file_progress.read_bytes), total_raw_bytes_to_read(file_progress.total_bytes_to_read) {}
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
/// Progress in JSON format (single line, without whitespaces) is used in HTTP headers.
void writeJSON(WriteBuffer & out) const;
/// Each value separately is changed atomically (but not whole object).
bool incrementPiecewiseAtomically(const Progress & rhs)
{
read_rows += rhs.read_rows;
read_bytes += rhs.read_bytes;
total_rows_to_read += rhs.total_rows_to_read;
written_rows += rhs.written_rows;
written_bytes += rhs.written_bytes;
bool incrementPiecewiseAtomically(const Progress & rhs);
return rhs.read_rows || rhs.written_rows;
}
void reset();
void reset()
{
read_rows = 0;
read_bytes = 0;
total_rows_to_read = 0;
written_rows = 0;
written_bytes = 0;
}
ProgressValues getValues() const;
ProgressValues getValues() const
{
ProgressValues res;
ProgressValues fetchAndResetPiecewiseAtomically();
res.read_rows = read_rows.load(std::memory_order_relaxed);
res.read_bytes = read_bytes.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed);
res.written_rows = written_rows.load(std::memory_order_relaxed);
res.written_bytes = written_bytes.load(std::memory_order_relaxed);
return res;
}
ProgressValues fetchAndResetPiecewiseAtomically()
{
ProgressValues res;
res.read_rows = read_rows.fetch_and(0);
res.read_bytes = read_bytes.fetch_and(0);
res.total_rows_to_read = total_rows_to_read.fetch_and(0);
res.written_rows = written_rows.fetch_and(0);
res.written_bytes = written_bytes.fetch_and(0);
return res;
}
Progress & operator=(Progress && other)
{
read_rows = other.read_rows.load(std::memory_order_relaxed);
read_bytes = other.read_bytes.load(std::memory_order_relaxed);
total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
written_rows = other.written_rows.load(std::memory_order_relaxed);
written_bytes = other.written_bytes.load(std::memory_order_relaxed);
return *this;
}
Progress & operator=(Progress && other);
Progress(Progress && other)
{

View File

@ -13,8 +13,6 @@
#include <Common/TerminalSize.h>
#include <IO/Operators.h>
#define CLEAR_TO_END_OF_LINE "\033[K"
namespace ProfileEvents
{
@ -191,71 +189,14 @@ off_t ReadBufferFromFileDescriptor::size()
void ReadBufferFromFileDescriptor::setProgressCallback(ContextPtr context)
{
/// Keep file progress and total bytes to process in context and not in readBuffer, because
/// multiple files might share the same progress (for example, for file table engine when globs are used)
/// and total_bytes_to_process will contain sum of sizes of all files.
auto file_progress_callback = context->getFileProgressCallback();
if (!context->getFileProgress().total_bytes_to_process)
context->setFileTotalBytesToProcess(size());
if (!file_progress_callback)
return;
setProfileCallback([context](const ProfileInfo & progress)
setProfileCallback([file_progress_callback](const ProfileInfo & progress)
{
static size_t increment = 0;
static const char * indicators[8] =
{
"\033[1;30m→\033[0m",
"\033[1;31m↘\033[0m",
"\033[1;32m↓\033[0m",
"\033[1;33m↙\033[0m",
"\033[1;34m←\033[0m",
"\033[1;35m↖\033[0m",
"\033[1;36m↑\033[0m",
"\033[1m↗\033[0m",
};
size_t terminal_width = getTerminalWidth();
WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);
const auto & file_progress = context->getFileProgress();
if (!file_progress.processed_bytes)
message << std::string(terminal_width, ' ');
message << '\r';
file_progress.processed_bytes += progress.bytes_read;
const char * indicator = indicators[increment % 8];
size_t prefix_size = message.count();
size_t processed_bytes = file_progress.processed_bytes.load();
message << indicator << " Progress: ";
message << formatReadableSizeWithDecimalSuffix(file_progress.processed_bytes);
message << " from " << formatReadableSizeWithDecimalSuffix(file_progress.total_bytes_to_process) << " bytes. ";
/// Display progress bar only if .25 seconds have passed since query execution start.
size_t elapsed_ns = file_progress.watch.elapsed();
if (elapsed_ns > 25000000)
{
size_t written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 1); /// Don't count invisible output (escape sequences).
ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_width) - written_progress_chars - strlen(" 99%");
size_t total_bytes_corrected = std::max(processed_bytes, file_progress.total_bytes_to_process);
if (width_of_progress_bar > 0 && progress.bytes_read > 0)
{
std::string bar = UnicodeBar::render(UnicodeBar::getWidth(processed_bytes, 0, total_bytes_corrected, width_of_progress_bar));
message << "\033[0;32m" << bar << "\033[0m";
if (width_of_progress_bar > static_cast<ssize_t>(bar.size() / UNICODE_BAR_CHAR_SIZE))
message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' ');
}
/// Underestimate percentage a bit to avoid displaying 100%.
message << ' ' << (99 * file_progress.processed_bytes / file_progress.total_bytes_to_process) << '%';
}
message << CLEAR_TO_END_OF_LINE;
message.next();
++increment;
file_progress_callback(FileProgress(progress.bytes_read, 0));
});
}

View File

@ -181,8 +181,9 @@ private:
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
FileProgress file_progress; /// Progress data to track processing of file based buffer(s).
bool render_progress = false;
using FileProgressCallback = std::function<void(const FileProgress & progress)>;
FileProgressCallback file_progress_callback; /// Callback for tracking progress of file loading.
QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
StorageID insertion_table = StorageID::createEmpty(); /// Saved insertion table in query context
@ -588,11 +589,8 @@ public:
/// Used in InterpreterSelectQuery to pass it to the IBlockInputStream.
ProgressCallback getProgressCallback() const;
void setRenderProgress() { render_progress = true; }
bool needRenderProgress() const { return render_progress; }
const FileProgress & getFileProgress() { return file_progress; }
void setFileTotalBytesToProcess(size_t num_bytes) { file_progress.total_bytes_to_process = num_bytes; }
void setFileProgressCallback(FileProgressCallback && callback) { file_progress_callback = callback; }
FileProgressCallback getFileProgressCallback() const { return file_progress_callback; }
/** Set in executeQuery and InterpreterSelectQuery. Then it is used in IBlockInputStream,
* to update and monitor information about the total number of resources spent for the query.

View File

@ -372,7 +372,7 @@ public:
}
/// For clickhouse-local add progress callback to display progress bar.
if (context->needRenderProgress() && context->getApplicationType() == Context::ApplicationType::LOCAL)
if (context->getApplicationType() == Context::ApplicationType::LOCAL)
{
auto & in = static_cast<ReadBufferFromFileDescriptor &>(*nested_buffer);
in.setProgressCallback(context);
@ -498,9 +498,10 @@ Pipe StorageFile::read(
Pipes pipes;
pipes.reserve(num_streams);
/// For clickhouse-local to display progress bar.
if (context->getApplicationType() == Context::ApplicationType::LOCAL)
context->setFileTotalBytesToProcess(total_bytes_to_read);
/// Set total number of bytes to process. For progress bar.
auto progress_callback = context->getFileProgressCallback();
if (context->getApplicationType() == Context::ApplicationType::LOCAL && progress_callback)
progress_callback(FileProgress(0, total_bytes_to_read));
for (size_t i = 0; i < num_streams; ++i)
{