fix race between progress indicator and progress table

This commit is contained in:
Julia Kartseva 2024-11-14 02:53:09 +00:00
parent 7007ce7596
commit e5f4ba8017
6 changed files with 89 additions and 33 deletions

View File

@ -68,15 +68,16 @@
#include <Access/AccessControl.h>
#include <Storages/ColumnsDescription.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <iostream>
#include <filesystem>
#include <iostream>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <string_view>
#include <unordered_map>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <Common/config_version.h>
#include <base/find_symbols.h>
@ -441,9 +442,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
/// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker.
if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
try
{
@ -464,13 +471,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
{
if (select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
progress_indication.writeProgress(*tty_buf);
std::unique_lock lock(tty_mutex);
progress_indication.writeProgress(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf && !cancelled)
{
if (!need_render_progress && select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), progress_table_toggle_enabled);
std::unique_lock lock(tty_mutex);
progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), progress_table_toggle_enabled);
}
}
@ -479,9 +488,15 @@ void ClientBase::onLogData(Block & block)
{
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
logs_out_stream->writeLogs(block);
logs_out_stream->flush();
}
@ -1318,7 +1333,10 @@ void ClientBase::onProgress(const Progress & value)
output_format->onProgress(value);
if (need_render_progress && tty_buf)
progress_indication.writeProgress(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.writeProgress(*tty_buf, lock);
}
}
void ClientBase::onTimezoneUpdate(const String & tz)
@ -1330,9 +1348,15 @@ void ClientBase::onTimezoneUpdate(const String & tz)
void ClientBase::onEndOfStream()
{
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
if (output_format)
{
@ -1414,11 +1438,15 @@ void ClientBase::onProfileEvents(Block & block)
progress_table.updateTable(block);
if (need_render_progress && tty_buf)
progress_indication.writeProgress(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.writeProgress(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf && !cancelled)
{
bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true);
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), toggle_enabled);
std::unique_lock lock(tty_mutex);
progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), toggle_enabled);
}
if (profile_events.print)
@ -1429,9 +1457,15 @@ void ClientBase::onProfileEvents(Block & block)
profile_events.watch.restart();
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
@ -1450,7 +1484,10 @@ void ClientBase::onProfileEvents(Block & block)
void ClientBase::resetOutput()
{
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
/// Order is important: format, compression, file
@ -1665,7 +1702,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Set callback to be called on file progress.
if (tty_buf)
progress_indication.setFileProgressCallback(client_context, *tty_buf);
progress_indication.setFileProgressCallback(client_context, *tty_buf, tty_mutex);
}
/// If data fetched from file (maybe compressed file)
@ -1947,9 +1984,15 @@ void ClientBase::cancelQuery()
}
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
if (is_interactive)
output_stream << "Cancelling query." << std::endl;
@ -2112,9 +2155,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
{
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush();

View File

@ -325,6 +325,7 @@ protected:
/// /dev/tty if accessible or std::cerr - for progress bar.
/// We prefer to output progress bar directly to tty to allow user to redirect stdout and stderr and still get the progress indication.
std::unique_ptr<WriteBufferFromFileDescriptor> tty_buf;
std::mutex tty_mutex;
String home_path;
String history_file; /// Path to a file containing command history.

View File

@ -14,6 +14,7 @@
#include <Common/formatReadable.h>
#include <format>
#include <mutex>
#include <numeric>
#include <unordered_map>
@ -192,7 +193,8 @@ void writeWithWidthStrict(Out & out, std::string_view s, size_t width)
}
void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled)
void ProgressTable::writeTable(
WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &, bool show_table, bool toggle_enabled)
{
std::lock_guard lock{mutex};
if (!show_table && toggle_enabled)
@ -360,7 +362,7 @@ void ProgressTable::updateTable(const Block & block)
written_first_block = true;
}
void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message)
void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
{
message << "\r" << CLEAR_TO_END_OF_SCREEN << SHOW_CURSOR;
message.next();

View File

@ -27,8 +27,9 @@ public:
}
/// Write progress table with metrics.
void writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled);
void clearTableOutput(WriteBufferFromFileDescriptor & message);
void writeTable(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock,
bool show_table, bool toggle_enabled);
void clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock);
void writeFinalTable();
/// Update the metric values. They can be updated from:

View File

@ -2,6 +2,7 @@
#include <algorithm>
#include <cstddef>
#include <iostream>
#include <mutex>
#include <numeric>
#include <filesystem>
#include <cmath>
@ -49,12 +50,13 @@ void ProgressIndication::resetProgress()
}
}
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message)
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex)
{
context->setFileProgressCallback([&](const FileProgress & file_progress)
{
progress.incrementPiecewiseAtomically(Progress(file_progress));
writeProgress(message);
std::unique_lock message_lock(message_mutex);
writeProgress(message, message_lock);
});
}
@ -113,7 +115,7 @@ void ProgressIndication::writeFinalProgress()
output_stream << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
}
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
{
std::lock_guard lock(progress_mutex);
@ -274,7 +276,7 @@ void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
message.next();
}
void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message)
void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
{
std::lock_guard lock(progress_mutex);

View File

@ -8,6 +8,7 @@
#include <iostream>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <unordered_set>
@ -47,8 +48,8 @@ public:
}
/// Write progress bar.
void writeProgress(WriteBufferFromFileDescriptor & message);
void clearProgressOutput(WriteBufferFromFileDescriptor & message);
void writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock);
void clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock);
/// Write summary.
void writeFinalProgress();
@ -67,7 +68,7 @@ public:
/// In some cases there is a need to update progress value, when there is no access to progress_inidcation object.
/// In this case it is added via context.
/// `write_progress_on_update` is needed to write progress for loading files data via pipe in non-interactive mode.
void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message);
void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex);
/// How much seconds passed since query execution start.
double elapsedSeconds() const { return getElapsedNanoseconds() / 1e9; }