diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index ce47f503931..8851146f240 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -68,15 +68,16 @@ #include #include -#include -#include -#include #include +#include #include #include #include +#include #include #include +#include +#include #include #include @@ -441,9 +442,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query) /// If results are written INTO OUTFILE, we can avoid clearing progress to avoid flicker. if (need_render_progress && tty_buf && (!select_into_file || select_into_file_and_stdout)) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf && (!select_into_file || select_into_file_and_stdout)) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } try { @@ -464,13 +471,15 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query) { if (select_into_file && !select_into_file_and_stdout) error_stream << "\r"; - progress_indication.writeProgress(*tty_buf); + std::unique_lock lock(tty_mutex); + progress_indication.writeProgress(*tty_buf, lock); } if (need_render_progress_table && tty_buf && !cancelled) { if (!need_render_progress && select_into_file && !select_into_file_and_stdout) error_stream << "\r"; - progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), progress_table_toggle_enabled); + std::unique_lock lock(tty_mutex); + progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), progress_table_toggle_enabled); } } @@ -479,9 +488,15 @@ void ClientBase::onLogData(Block & block) { initLogsOutputStream(); if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } logs_out_stream->writeLogs(block); logs_out_stream->flush(); } @@ -1318,7 +1333,10 @@ void ClientBase::onProgress(const Progress & value) output_format->onProgress(value); if (need_render_progress && tty_buf) - progress_indication.writeProgress(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.writeProgress(*tty_buf, lock); + } } void ClientBase::onTimezoneUpdate(const String & tz) @@ -1330,9 +1348,15 @@ void ClientBase::onTimezoneUpdate(const String & tz) void ClientBase::onEndOfStream() { if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } if (output_format) { @@ -1414,11 +1438,15 @@ void ClientBase::onProfileEvents(Block & block) progress_table.updateTable(block); if (need_render_progress && tty_buf) - progress_indication.writeProgress(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.writeProgress(*tty_buf, lock); + } if (need_render_progress_table && tty_buf && !cancelled) { bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true); - progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), toggle_enabled); + std::unique_lock lock(tty_mutex); + progress_table.writeTable(*tty_buf, lock, progress_table_toggle_on.load(), toggle_enabled); } if (profile_events.print) @@ -1429,9 +1457,15 @@ void ClientBase::onProfileEvents(Block & block) profile_events.watch.restart(); initLogsOutputStream(); if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } logs_out_stream->writeProfileEvents(block); logs_out_stream->flush(); @@ -1450,7 +1484,10 @@ void ClientBase::onProfileEvents(Block & block) void ClientBase::resetOutput() { if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } /// Order is important: format, compression, file @@ -1665,7 +1702,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des /// Set callback to be called on file progress. if (tty_buf) - progress_indication.setFileProgressCallback(client_context, *tty_buf); + progress_indication.setFileProgressCallback(client_context, *tty_buf, tty_mutex); } /// If data fetched from file (maybe compressed file) @@ -1947,9 +1984,15 @@ void ClientBase::cancelQuery() } if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } if (is_interactive) output_stream << "Cancelling query." << std::endl; @@ -2112,9 +2155,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin { initLogsOutputStream(); if (need_render_progress && tty_buf) - progress_indication.clearProgressOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_indication.clearProgressOutput(*tty_buf, lock); + } if (need_render_progress_table && tty_buf) - progress_table.clearTableOutput(*tty_buf); + { + std::unique_lock lock(tty_mutex); + progress_table.clearTableOutput(*tty_buf, lock); + } logs_out_stream->writeProfileEvents(profile_events.last_block); logs_out_stream->flush(); diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 6b261714ff6..29d69fd973d 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -325,6 +325,7 @@ protected: /// /dev/tty if accessible or std::cerr - for progress bar. /// We prefer to output progress bar directly to tty to allow user to redirect stdout and stderr and still get the progress indication. std::unique_ptr tty_buf; + std::mutex tty_mutex; String home_path; String history_file; /// Path to a file containing command history. diff --git a/src/Client/ProgressTable.cpp b/src/Client/ProgressTable.cpp index f63935440e4..8a1e60b1b0c 100644 --- a/src/Client/ProgressTable.cpp +++ b/src/Client/ProgressTable.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -192,7 +193,8 @@ void writeWithWidthStrict(Out & out, std::string_view s, size_t width) } -void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled) +void ProgressTable::writeTable( + WriteBufferFromFileDescriptor & message, std::unique_lock &, bool show_table, bool toggle_enabled) { std::lock_guard lock{mutex}; if (!show_table && toggle_enabled) @@ -360,7 +362,7 @@ void ProgressTable::updateTable(const Block & block) written_first_block = true; } -void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message) +void ProgressTable::clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock &) { message << "\r" << CLEAR_TO_END_OF_SCREEN << SHOW_CURSOR; message.next(); diff --git a/src/Client/ProgressTable.h b/src/Client/ProgressTable.h index f2563d91217..44f91c2e408 100644 --- a/src/Client/ProgressTable.h +++ b/src/Client/ProgressTable.h @@ -27,8 +27,9 @@ public: } /// Write progress table with metrics. - void writeTable(WriteBufferFromFileDescriptor & message, bool show_table, bool toggle_enabled); - void clearTableOutput(WriteBufferFromFileDescriptor & message); + void writeTable(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock, + bool show_table, bool toggle_enabled); + void clearTableOutput(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock); void writeFinalTable(); /// Update the metric values. They can be updated from: diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp index 79c694574b0..8702b0c9aea 100644 --- a/src/Common/ProgressIndication.cpp +++ b/src/Common/ProgressIndication.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -49,12 +50,13 @@ void ProgressIndication::resetProgress() } } -void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message) +void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex) { context->setFileProgressCallback([&](const FileProgress & file_progress) { progress.incrementPiecewiseAtomically(Progress(file_progress)); - writeProgress(message); + std::unique_lock message_lock(message_mutex); + writeProgress(message, message_lock); }); } @@ -113,7 +115,7 @@ void ProgressIndication::writeFinalProgress() output_stream << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << "."; } -void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message) +void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock &) { std::lock_guard lock(progress_mutex); @@ -274,7 +276,7 @@ void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message) message.next(); } -void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message) +void ProgressIndication::clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock &) { std::lock_guard lock(progress_mutex); diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h index 61b4ca1b305..6beadff1fc4 100644 --- a/src/Common/ProgressIndication.h +++ b/src/Common/ProgressIndication.h @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -47,8 +48,8 @@ public: } /// Write progress bar. - void writeProgress(WriteBufferFromFileDescriptor & message); - void clearProgressOutput(WriteBufferFromFileDescriptor & message); + void writeProgress(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock); + void clearProgressOutput(WriteBufferFromFileDescriptor & message, std::unique_lock & message_lock); /// Write summary. void writeFinalProgress(); @@ -67,7 +68,7 @@ public: /// In some cases there is a need to update progress value, when there is no access to progress_inidcation object. /// In this case it is added via context. /// `write_progress_on_update` is needed to write progress for loading files data via pipe in non-interactive mode. - void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message); + void setFileProgressCallback(ContextMutablePtr context, WriteBufferFromFileDescriptor & message, std::mutex & message_mutex); /// How much seconds passed since query execution start. double elapsedSeconds() const { return getElapsedNanoseconds() / 1e9; }