Render progress directly in tty

This commit is contained in:
Alexey Milovidov 2022-10-01 23:19:36 +02:00
parent b65f674dd4
commit 5d710e21f1
6 changed files with 76 additions and 46 deletions

View File

@ -1025,7 +1025,7 @@ void Client::processConfig()
}
else
{
need_render_progress = config().getBool("progress", false);
need_render_progress = config().getBool("progress", true);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);

View File

@ -488,7 +488,7 @@ void LocalServer::processConfig()
}
else
{
need_render_progress = config().getBool("progress", false);
need_render_progress = config().getBool("progress", true);
echo_queries = config().hasOption("echo") || config().hasOption("verbose");
ignore_error = config().getBool("ignore-error", false);
is_multiquery = true;

View File

@ -415,7 +415,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
/// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker.
if (need_render_progress && (stdout_is_a_tty || is_interactive) && (!select_into_file || select_into_file_and_stdout))
progress_indication.clearProgressOutput();
progress_indication.clearProgressOutput(*tty_buf);
try
{
@ -436,7 +436,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
{
if (select_into_file && !select_into_file_and_stdout)
std::cerr << "\r";
progress_indication.writeProgress();
progress_indication.writeProgress(*tty_buf);
}
}
@ -444,7 +444,8 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
void ClientBase::onLogData(Block & block)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
if (need_render_progress)
progress_indication.clearProgressOutput(*tty_buf);
logs_out_stream->writeLogs(block);
logs_out_stream->flush();
}
@ -640,6 +641,33 @@ void ClientBase::initLogsOutputStream()
}
}
void ClientBase::initTtyBuffer()
{
if (!tty_buf)
{
static constexpr auto tty_file_name = "/dev/tty";
/// Output all progress bar commands to stderr at once to avoid flicker.
/// This size is usually greater than the window size.
static constexpr size_t buf_size = 1024;
std::error_code ec;
std::filesystem::file_status tty = std::filesystem::status(tty_file_name, ec);
if (!ec && exists(tty) && is_character_file(tty)
&& (tty.permissions() & std::filesystem::perms::others_write) != std::filesystem::perms::none)
{
tty_buf = std::make_unique<WriteBufferFromFile>(tty_file_name, buf_size);
}
else if (stderr_is_a_tty)
{
tty_buf = std::make_unique<WriteBufferFromFileDescriptor>(STDERR_FILENO, buf_size);
}
else
need_render_progress = false;
}
}
void ClientBase::updateSuggest(const ASTPtr & ast)
{
std::vector<std::string> new_words;
@ -939,13 +967,14 @@ void ClientBase::onProgress(const Progress & value)
output_format->onProgress(value);
if (need_render_progress)
progress_indication.writeProgress();
progress_indication.writeProgress(*tty_buf);
}
void ClientBase::onEndOfStream()
{
progress_indication.clearProgressOutput();
if (need_render_progress)
progress_indication.clearProgressOutput(*tty_buf);
if (output_format)
output_format->finalize();
@ -953,10 +982,7 @@ void ClientBase::onEndOfStream()
resetOutput();
if (is_interactive && !written_first_block)
{
progress_indication.clearProgressOutput();
std::cout << "Ok." << std::endl;
}
}
@ -1000,14 +1026,15 @@ void ClientBase::onProfileEvents(Block & block)
progress_indication.updateThreadEventData(thread_times);
if (need_render_progress)
progress_indication.writeProgress();
progress_indication.writeProgress(*tty_buf);
if (profile_events.print)
{
if (profile_events.watch.elapsedMilliseconds() >= profile_events.delay_ms)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
if (need_render_progress)
progress_indication.clearProgressOutput(*tty_buf);
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
@ -1181,7 +1208,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
progress_indication.updateProgress(Progress(file_progress));
/// Set callback to be called on file progress.
progress_indication.setFileProgressCallback(global_context, true);
progress_indication.setFileProgressCallback(global_context, *tty_buf);
}
/// If data fetched from file (maybe compressed file)
@ -1433,12 +1460,10 @@ bool ClientBase::receiveEndOfQuery()
void ClientBase::cancelQuery()
{
connection->sendCancel();
if (is_interactive)
{
progress_indication.clearProgressOutput();
std::cout << "Cancelling query." << std::endl;
if (need_render_progress)
progress_indication.clearProgressOutput(*tty_buf);
}
std::cout << "Cancelling query." << std::endl;
cancelled = true;
}
@ -1556,7 +1581,8 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (profile_events.last_block)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
if (need_render_progress)
progress_indication.clearProgressOutput(*tty_buf);
logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush();
@ -2216,6 +2242,8 @@ void ClientBase::init(int argc, char ** argv)
stdout_is_a_tty = isatty(STDOUT_FILENO);
stderr_is_a_tty = isatty(STDERR_FILENO);
terminal_width = getTerminalWidth();
if (need_render_progress)
initTtyBuffer();
Arguments common_arguments{""}; /// 0th argument is ignored.
std::vector<Arguments> external_tables_arguments;
@ -2243,7 +2271,7 @@ void ClientBase::init(int argc, char ** argv)
("stage", po::value<std::string>()->default_value("complete"), "Request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
("query_kind", po::value<std::string>()->default_value("initial_query"), "One of initial_query/secondary_query/no_query")
("query_id", po::value<std::string>(), "query_id")
("progress", "print progress of queries execution")
("progress", po::value<bool>()->implicit_value(true), "print progress of queries execution")
("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
@ -2348,7 +2376,7 @@ void ClientBase::init(int argc, char ** argv)
if (options.count("profile-events-delay-ms"))
config().setInt("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
if (options.count("progress"))
config().setBool("progress", true);
config().setBool("progress", options["progress"].as<bool>());
if (options.count("echo"))
config().setBool("echo", true);
if (options.count("disable_suggestion"))

View File

@ -143,11 +143,11 @@ private:
void initOutputFormat(const Block & block, ASTPtr parsed_query);
void initLogsOutputStream();
void initTtyBuffer();
String prompt() const;
void resetOutput();
void outputQueryInfo(bool echo_query_);
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
void updateSuggest(const ASTPtr & ast);
@ -219,6 +219,10 @@ protected:
String server_logs_file;
std::unique_ptr<InternalTextLogs> logs_out_stream;
/// /dev/tty if accessible or std::cerr - for progress bar.
/// We prefer to output progress bar directly to tty to allow user to redirect stdout and stderr and still get the progress indication.
std::unique_ptr<WriteBuffer> tty_buf;
String home_path;
String history_file; /// Path to a file containing command history.

View File

@ -2,6 +2,7 @@
#include <algorithm>
#include <cstddef>
#include <numeric>
#include <filesystem>
#include <cmath>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <base/types.h>
@ -44,15 +45,6 @@ bool ProgressIndication::updateProgress(const Progress & value)
return progress.incrementPiecewiseAtomically(value);
}
void ProgressIndication::clearProgressOutput()
{
if (written_progress_chars)
{
written_progress_chars = 0;
std::cerr << "\r" CLEAR_TO_END_OF_LINE;
}
}
void ProgressIndication::resetProgress()
{
watch.restart();
@ -67,15 +59,12 @@ void ProgressIndication::resetProgress()
}
}
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, bool write_progress_on_update_)
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBuffer & message)
{
write_progress_on_update = write_progress_on_update_;
context->setFileProgressCallback([&](const FileProgress & file_progress)
{
progress.incrementPiecewiseAtomically(Progress(file_progress));
if (write_progress_on_update)
writeProgress();
writeProgress(message);
});
}
@ -153,13 +142,10 @@ void ProgressIndication::writeFinalProgress()
std::cout << ". ";
}
void ProgressIndication::writeProgress()
void ProgressIndication::writeProgress(WriteBuffer & message)
{
std::lock_guard lock(progress_mutex);
/// Output all progress bar commands to stderr at once to avoid flicker.
WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);
static size_t increment = 0;
static const char * indicators[8] = {
"\033[1;30m→\033[0m",
@ -318,4 +304,14 @@ void ProgressIndication::writeProgress()
message.next();
}
void ProgressIndication::clearProgressOutput(WriteBuffer & message)
{
if (written_progress_chars)
{
written_progress_chars = 0;
message << "\r" CLEAR_TO_END_OF_LINE;
message.next();
}
}
}

View File

@ -12,9 +12,12 @@
/// http://en.wikipedia.org/wiki/ANSI_escape_code
#define CLEAR_TO_END_OF_LINE "\033[K"
namespace DB
{
class WriteBuffer;
struct ThreadEventData
{
UInt64 time() const noexcept { return user_ms + system_ms; }
@ -30,14 +33,13 @@ using HostToThreadTimesMap = std::unordered_map<String, ThreadIdToTimeMap>;
class ProgressIndication
{
public:
/// Write progress to stderr.
void writeProgress();
/// Write progress bar.
void writeProgress(WriteBuffer & message);
void clearProgressOutput(WriteBuffer & message);
/// Write summary.
void writeFinalProgress();
/// Clear stderr output.
void clearProgressOutput();
/// Reset progress values.
void resetProgress();
@ -52,7 +54,7 @@ public:
/// In some cases there is a need to update progress value, when there is no access to progress_inidcation object.
/// In this case it is added via context.
/// `write_progress_on_update` is needed to write progress for loading files data via pipe in non-interactive mode.
void setFileProgressCallback(ContextMutablePtr context, bool write_progress_on_update = false);
void setFileProgressCallback(ContextMutablePtr context, WriteBuffer & message);
/// How much seconds passed since query execution start.
double elapsedSeconds() const { return getElapsedNanoseconds() / 1e9; }