Merge pull request #71901 from jkartseva/fix-infile-interactive-metrics

Fix data race between progress indicator and progress table in clickhouse-client
This commit is contained in:
Julia Kartseva 2024-11-15 02:16:12 +00:00 committed by GitHub
commit 3c0f299148
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 130 additions and 61 deletions

View File

@ -68,15 +68,16 @@
#include <Access/AccessControl.h>
#include <Storages/ColumnsDescription.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <iostream>
#include <filesystem>
#include <iostream>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <string_view>
#include <unordered_map>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <Common/config_version.h>
#include <base/find_symbols.h>
@ -441,9 +442,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
/// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker.
if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf && (!select_into_file || select_into_file_and_stdout))
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
try
{
@ -464,13 +471,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
{
if (select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
progress_indication.writeProgress(*tty_buf);
std::unique_lock lock(tty_mutex);
progress_indication.writeProgress(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf && !cancelled)
{
if (!need_render_progress && select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), progress_table_toggle_enabled);
std::unique_lock lock(tty_mutex);
progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), progress_table_toggle_enabled);
}
}
@ -479,9 +488,15 @@ void ClientBase::onLogData(Block & block)
{
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
logs_out_stream->writeLogs(block);
logs_out_stream->flush();
}
@ -1151,34 +1166,8 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
std::exception_ptr local_format_error;
if (keystroke_interceptor)
{
progress_table_toggle_on = false;
try
{
keystroke_interceptor->startIntercept();
}
catch (const DB::Exception &)
{
error_stream << getCurrentExceptionMessage(false);
keystroke_interceptor.reset();
}
}
SCOPE_EXIT({
if (keystroke_interceptor)
{
try
{
keystroke_interceptor->stopIntercept();
}
catch (...)
{
error_stream << getCurrentExceptionMessage(false);
keystroke_interceptor.reset();
}
}
});
startKeystrokeInterceptorIfExists();
SCOPE_EXIT({ stopKeystrokeInterceptorIfExists(); });
while (true)
{
@ -1318,7 +1307,10 @@ void ClientBase::onProgress(const Progress & value)
output_format->onProgress(value);
if (need_render_progress && tty_buf)
progress_indication.writeProgress(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.writeProgress(*tty_buf, lock);
}
}
void ClientBase::onTimezoneUpdate(const String & tz)
@ -1330,9 +1322,15 @@ void ClientBase::onTimezoneUpdate(const String & tz)
void ClientBase::onEndOfStream()
{
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
if (output_format)
{
@ -1414,11 +1412,15 @@ void ClientBase::onProfileEvents(Block & block)
progress_table.updateTable(block);
if (need_render_progress && tty_buf)
progress_indication.writeProgress(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.writeProgress(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf && !cancelled)
{
bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true);
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), toggle_enabled);
std::unique_lock lock(tty_mutex);
progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), toggle_enabled);
}
if (profile_events.print)
@ -1429,9 +1431,15 @@ void ClientBase::onProfileEvents(Block & block)
profile_events.watch.restart();
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
@ -1450,7 +1458,10 @@ void ClientBase::onProfileEvents(Block & block)
void ClientBase::resetOutput()
{
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
/// Order is important: format, compression, file
@ -1619,6 +1630,9 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
if (send_external_tables)
sendExternalTables(parsed_query);
startKeystrokeInterceptorIfExists();
SCOPE_EXIT({ stopKeystrokeInterceptorIfExists(); });
/// Receive description of table structure.
Block sample;
ColumnsDescription columns_description;
@ -1665,7 +1679,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
/// Set callback to be called on file progress.
if (tty_buf)
progress_indication.setFileProgressCallback(client_context, *tty_buf);
progress_indication.setFileProgressCallback(client_context, *tty_buf, tty_mutex);
}
/// If data fetched from file (maybe compressed file)
@ -1947,9 +1961,15 @@ void ClientBase::cancelQuery()
}
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
if (is_interactive)
output_stream << "Cancelling query." << std::endl;
@ -2112,9 +2132,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
{
initLogsOutputStream();
if (need_render_progress && tty_buf)
progress_indication.clearProgressOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_indication.clearProgressOutput(*tty_buf, lock);
}
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
{
std::unique_lock lock(tty_mutex);
progress_table.clearTableOutput(*tty_buf, lock);
}
logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush();
@ -2613,6 +2639,39 @@ bool ClientBase::addMergeTreeSettings(ASTCreateQuery & ast_create)
return added_new_setting;
}
void ClientBase::startKeystrokeInterceptorIfExists()
{
if (keystroke_interceptor)
{
progress_table_toggle_on = false;
try
{
keystroke_interceptor->startIntercept();
}
catch (const DB::Exception &)
{
error_stream << getCurrentExceptionMessage(false);
keystroke_interceptor.reset();
}
}
}
void ClientBase::stopKeystrokeInterceptorIfExists()
{
if (keystroke_interceptor)
{
try
{
keystroke_interceptor->stopIntercept();
}
catch (...)
{
error_stream << getCurrentExceptionMessage(false);
keystroke_interceptor.reset();
}
}
}
void ClientBase::runInteractive()
{
if (getClientConfiguration().has("query_id"))

View File

@ -208,6 +208,9 @@ private:
void initQueryIdFormats();
bool addMergeTreeSettings(ASTCreateQuery & ast_create);
void startKeystrokeInterceptorIfExists();
void stopKeystrokeInterceptorIfExists();
protected:
class QueryInterruptHandler : private boost::noncopyable
@ -325,6 +328,7 @@ protected:
/// /dev/tty if accessible or std::cerr - for progress bar.
/// We prefer to output progress bar directly to tty to allow user to redirect stdout and stderr and still get the progress indication.
std::unique_ptr<WriteBufferFromFileDescriptor> tty_buf;
std::mutex tty_mutex;
String home_path;
String history_file; /// Path to a file containing command history.

View File

@ -14,6 +14,7 @@
#include <Common/formatReadable.h>
#include <format>
#include <mutex>
#include <numeric>
#include <unordered_map>
@ -192,7 +193,8 @@ void writeWithWidthStrict(Out & out, std::string_view s, size_t width)
}
void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled)
void ProgressTable::writeTable(
WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &, bool show_table, bool toggle_enabled)
{
std::lock_guard lock{mutex};
if (!show_table && toggle_enabled)
@ -360,7 +362,7 @@ void ProgressTable::updateTable(const Block & block)
written_first_block = true;
}
void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message)
void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
{
message << "\r" << CLEAR_TO_END_OF_SCREEN << SHOW_CURSOR;
message.next();

View File

@ -27,8 +27,9 @@ public:
}
/// Write progress table with metrics.
void writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled);
void clearTableOutput(WriteBufferFromFileDescriptor & message);
void writeTable(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock,
bool show_table, bool toggle_enabled);
void clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock);
void writeFinalTable();
/// Update the metric values. They can be updated from:

View File

@ -2,6 +2,7 @@
#include <algorithm>
#include <cstddef>
#include <iostream>
#include <mutex>
#include <numeric>
#include <filesystem>
#include <cmath>
@ -49,12 +50,13 @@ void ProgressIndication::resetProgress()
}
}
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message)
void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex)
{
context->setFileProgressCallback([&](const FileProgress & file_progress)
{
progress.incrementPiecewiseAtomically(Progress(file_progress));
writeProgress(message);
std::unique_lock message_lock(message_mutex);
writeProgress(message, message_lock);
});
}
@ -113,7 +115,7 @@ void ProgressIndication::writeFinalProgress()
output_stream << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
}
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
{
std::lock_guard lock(progress_mutex);
@ -274,7 +276,7 @@ void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
message.next();
}
void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message)
void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> &)
{
std::lock_guard lock(progress_mutex);

View File

@ -8,6 +8,7 @@
#include <iostream>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <unordered_set>
@ -47,8 +48,8 @@ public:
}
/// Write progress bar.
void writeProgress(WriteBufferFromFileDescriptor & message);
void clearProgressOutput(WriteBufferFromFileDescriptor & message);
void writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock);
void clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock<std::mutex> & message_lock);
/// Write summary.
void writeFinalProgress();
@ -67,7 +68,7 @@ public:
/// In some cases there is a need to update progress value, when there is no access to progress_inidcation object.
/// In this case it is added via context.
/// `write_progress_on_update` is needed to write progress for loading files data via pipe in non-interactive mode.
void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message);
void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex);
/// How much seconds passed since query execution start.
double elapsedSeconds() const { return getElapsedNanoseconds() / 1e9; }