From e5f4ba8017fad93f42ec63d3f837b61837c31978 Mon Sep 17 00:00:00 2001 From: Julia Kartseva Date: Thu, 14 Nov 2024 02:53:09 +0000 Subject: [PATCH 1/2] fix race between progress indicator and progress table --- src/Client/ClientBase.cpp | 93 +++++++++++++++++++++++-------- src/Client/ClientBase.h | 1 + src/Client/ProgressTable.cpp | 6 +- src/Client/ProgressTable.h | 5 +- src/Common/ProgressIndication.cpp | 10 ++-- src/Common/ProgressIndication.h | 7 ++- 6 files changed, 89 insertions(+), 33 deletions(-) diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index ce47f503931..8851146f240 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -68,15 +68,16 @@ #include #include -#include -#include -#include #include +#include #include #include #include +#include #include #include +#include +#include #include #include @@ -441,9 +442,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query) /// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker. if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout)) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf && (!select_into_file || select_into_file_and_stdout)) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } try { @@ -464,13 +471,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query) { if (select_into_file && !select_into_file_and_stdout) error_stream << "\r"; - progress_indication.writeProgress(*tty_buf); + std::unique_lock lock(tty_mutex); + progress_indication.writeProgress(*tty_buf, lock); } if (need_render_progress_table && tty_buf && !cancelled) { if (!need_render_progress && select_into_file && !select_into_file_and_stdout) error_stream << "\r"; - progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), progress_table_toggle_enabled); + std::unique_lock lock(tty_mutex); + progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), progress_table_toggle_enabled); } } @@ -479,9 +488,15 @@ void ClientBase::onLogData(Block & block) { initLogsOutputStream(); if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } logs_out_stream->writeLogs(block); logs_out_stream->flush(); } @@ -1318,7 +1333,10 @@ void ClientBase::onProgress(const Progress & value) output_format->onProgress(value); if (need_render_progress && tty_buf) - progress_indication.writeProgress(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.writeProgress(*tty_buf, lock); + } } void ClientBase::onTimezoneUpdate(const String & tz) @@ -1330,9 +1348,15 @@ void ClientBase::onTimezoneUpdate(const String & tz) void ClientBase::onEndOfStream() { if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } if (output_format) { @@ -1414,11 +1438,15 @@ void ClientBase::onProfileEvents(Block & block) progress_table.updateTable(block); if (need_render_progress && tty_buf) - progress_indication.writeProgress(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.writeProgress(*tty_buf, lock); + } if (need_render_progress_table && tty_buf && !cancelled) { bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true); - progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), toggle_enabled); + std::unique_lock lock(tty_mutex); + progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), toggle_enabled); } if (profile_events.print) @@ -1429,9 +1457,15 @@ void ClientBase::onProfileEvents(Block & block) profile_events.watch.restart(); initLogsOutputStream(); if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } logs_out_stream->writeProfileEvents(block); logs_out_stream->flush(); @@ -1450,7 +1484,10 @@ void ClientBase::onProfileEvents(Block & block) void ClientBase::resetOutput() { if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } /// Order is important: format, compression, file @@ -1665,7 +1702,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des /// Set callback to be called on file progress. if (tty_buf) - progress_indication.setFileProgressCallback(client_context, *tty_buf); + progress_indication.setFileProgressCallback(client_context, *tty_buf, tty_mutex); } /// If data fetched from file (maybe compressed file) @@ -1947,9 +1984,15 @@ void ClientBase::cancelQuery() } if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } if (is_interactive) output_stream << "Cancelling query." << std::endl; @@ -2112,9 +2155,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin { initLogsOutputStream(); if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } logs_out_stream->writeProfileEvents(profile_events.last_block); logs_out_stream->flush(); diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 6b261714ff6..29d69fd973d 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -325,6 +325,7 @@ protected: /// /dev/tty if accessible or std::cerr - for progress bar. /// We prefer to output progress bar directly to tty to allow user to redirect stdout and stderr and still get the progress indication. std::unique_ptr tty_buf; + std::mutex tty_mutex; String home_path; String history_file; /// Path to a file containing command history. diff --git a/src/Client/ProgressTable.cpp b/src/Client/ProgressTable.cpp index f63935440e4..8a1e60b1b0c 100644 --- a/src/Client/ProgressTable.cpp +++ b/src/Client/ProgressTable.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -192,7 +193,8 @@ void writeWithWidthStrict(Out & out, std::string_view s, size_t width) } -void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled) +void ProgressTable::writeTable( + WriteBufferFromFileDescriptor & message, std::unique_lock &, bool show_table, bool toggle_enabled) { std::lock_guard lock{mutex}; if (!show_table && toggle_enabled) @@ -360,7 +362,7 @@ void ProgressTable::updateTable(const Block & block) written_first_block = true; } -void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message) +void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock &) { message << "\r" << CLEAR_TO_END_OF_SCREEN << SHOW_CURSOR; message.next(); diff --git a/src/Client/ProgressTable.h b/src/Client/ProgressTable.h index f2563d91217..44f91c2e408 100644 --- a/src/Client/ProgressTable.h +++ b/src/Client/ProgressTable.h @@ -27,8 +27,9 @@ public: } /// Write progress table with metrics. - void writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled); - void clearTableOutput(WriteBufferFromFileDescriptor & message); + void writeTable(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock, + bool show_table, bool toggle_enabled); + void clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock); void writeFinalTable(); /// Update the metric values. They can be updated from: diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp index 79c694574b0..8702b0c9aea 100644 --- a/src/Common/ProgressIndication.cpp +++ b/src/Common/ProgressIndication.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -49,12 +50,13 @@ void ProgressIndication::resetProgress() } } -void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message) +void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex) { context->setFileProgressCallback([&](const FileProgress & file_progress) { progress.incrementPiecewiseAtomically(Progress(file_progress)); - writeProgress(message); + std::unique_lock message_lock(message_mutex); + writeProgress(message, message_lock); }); } @@ -113,7 +115,7 @@ void ProgressIndication::writeFinalProgress() output_stream << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << "."; } -void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message) +void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock &) { std::lock_guard lock(progress_mutex); @@ -274,7 +276,7 @@ void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message) message.next(); } -void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message) +void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock &) { std::lock_guard lock(progress_mutex); diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h index 61b4ca1b305..6beadff1fc4 100644 --- a/src/Common/ProgressIndication.h +++ b/src/Common/ProgressIndication.h @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -47,8 +48,8 @@ public: } /// Write progress bar. - void writeProgress(WriteBufferFromFileDescriptor & message); - void clearProgressOutput(WriteBufferFromFileDescriptor & message); + void writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock); + void clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock); /// Write summary. void writeFinalProgress(); @@ -67,7 +68,7 @@ public: /// In some cases there is a need to update progress value, when there is no access to progress_inidcation object. /// In this case it is added via context. /// `write_progress_on_update` is needed to write progress for loading files data via pipe in non-interactive mode. - void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message); + void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex); /// How much seconds passed since query execution start. double elapsedSeconds() const { return getElapsedNanoseconds() / 1e9; } From 26aaa87a916083fd4ad48ed4fa3c38ec435835f2 Mon Sep 17 00:00:00 2001 From: Julia Kartseva Date: Thu, 14 Nov 2024 03:16:04 +0000 Subject: [PATCH 2/2] intercept keystrokes for INSERT queries, too --- src/Client/ClientBase.cpp | 66 ++++++++++++++++++++++----------------- src/Client/ClientBase.h | 3 ++ 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 8851146f240..90120e009f0 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -1166,34 +1166,8 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b std::exception_ptr local_format_error; - if (keystroke_interceptor) - { - progress_table_toggle_on = false; - try - { - keystroke_interceptor->startIntercept(); - } - catch (const DB::Exception &) - { - error_stream << getCurrentExceptionMessage(false); - keystroke_interceptor.reset(); - } - } - - SCOPE_EXIT({ - if (keystroke_interceptor) - { - try - { - keystroke_interceptor->stopIntercept(); - } - catch (...) - { - error_stream << getCurrentExceptionMessage(false); - keystroke_interceptor.reset(); - } - } - }); + startKeystrokeInterceptorIfExists(); + SCOPE_EXIT({ stopKeystrokeInterceptorIfExists(); }); while (true) { @@ -1656,6 +1630,9 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars if (send_external_tables) sendExternalTables(parsed_query); + startKeystrokeInterceptorIfExists(); + SCOPE_EXIT({ stopKeystrokeInterceptorIfExists(); }); + /// Receive description of table structure. Block sample; ColumnsDescription columns_description; @@ -2662,6 +2639,39 @@ bool ClientBase::addMergeTreeSettings(ASTCreateQuery & ast_create) return added_new_setting; } +void ClientBase::startKeystrokeInterceptorIfExists() +{ + if (keystroke_interceptor) + { + progress_table_toggle_on = false; + try + { + keystroke_interceptor->startIntercept(); + } + catch (const DB::Exception &) + { + error_stream << getCurrentExceptionMessage(false); + keystroke_interceptor.reset(); + } + } +} + +void ClientBase::stopKeystrokeInterceptorIfExists() +{ + if (keystroke_interceptor) + { + try + { + keystroke_interceptor->stopIntercept(); + } + catch (...) + { + error_stream << getCurrentExceptionMessage(false); + keystroke_interceptor.reset(); + } + } +} + void ClientBase::runInteractive() { if (getClientConfiguration().has("query_id")) diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 29d69fd973d..7660efff582 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -208,6 +208,9 @@ private: void initQueryIdFormats(); bool addMergeTreeSettings(ASTCreateQuery & ast_create); + void startKeystrokeInterceptorIfExists(); + void stopKeystrokeInterceptorIfExists(); + protected: class QueryInterruptHandler : private boost::noncopyable