From 0feda67ec4f30e1dff8cd80564c6f3cd99da5276 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 6 May 2022 17:04:03 +0200 Subject: [PATCH] Fix progress for insert select --- programs/client/Client.cpp | 2 +- src/Client/ClientBase.cpp | 12 +++--- src/Client/Connection.cpp | 3 +- src/Client/Connection.h | 3 +- src/Client/HedgedConnections.cpp | 2 +- src/Client/IServerConnection.h | 3 +- src/Client/LocalConnection.cpp | 21 +++++++++-- src/Client/LocalConnection.h | 3 +- src/Client/MultiplexedConnections.cpp | 4 +- src/Client/Suggest.cpp | 2 +- src/Common/ProgressIndication.cpp | 11 +++--- src/IO/Progress.cpp | 41 +++++++++++++++------ src/IO/Progress.h | 14 +++---- src/Processors/Sources/SourceWithProgress.h | 2 + src/QueryPipeline/RemoteInserter.cpp | 2 +- src/Server/GRPCServer.cpp | 2 +- src/Server/TCPHandler.cpp | 3 +- src/Storages/StorageFile.cpp | 20 ++++++++-- 18 files changed, 101 insertions(+), 49 deletions(-) diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index d8636484cba..660ef7a5de1 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -107,7 +107,7 @@ std::vector Client::loadWarningMessages() connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings", "" /* query_id */, QueryProcessingStage::Complete, &global_context->getSettingsRef(), - &global_context->getClientInfo(), false); + &global_context->getClientInfo(), false, {}); while (true) { Packet packet = connection->receivePacket(); diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index def74fa0ec9..86a6f64b6ff 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -719,7 +719,8 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa query_processing_stage, &global_context->getSettingsRef(), &global_context->getClientInfo(), - true); + true, + [&](const Progress & progress) { onProgress(progress); }); if (send_external_tables) sendExternalTables(parsed_query); @@ -1072,7 +1073,8 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars query_processing_stage, &global_context->getSettingsRef(), &global_context->getClientInfo(), - true); + true, + [&](const Progress & progress) { onProgress(progress); }); if (send_external_tables) sendExternalTables(parsed_query); @@ -1104,7 +1106,9 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des if (!parsed_insert_query) return; - if (need_render_progress) + bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof(); + + if (need_render_progress && have_data_in_stdin) { /// Set total_bytes_to_read for current fd. FileProgress file_progress(0, std_in.size()); @@ -1114,8 +1118,6 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des progress_indication.setFileProgressCallback(global_context, true); } - bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof(); - /// If data fetched from file (maybe compressed file) if (parsed_insert_query->infile) { diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 021ae2142a1..e53d55f6964 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -451,7 +451,8 @@ void Connection::sendQuery( UInt64 stage, const Settings * settings, const ClientInfo * client_info, - bool with_pending_data) + bool with_pending_data, + std::function) { if (!connected) connect(timeouts); diff --git a/src/Client/Connection.h b/src/Client/Connection.h index e34a0a22f42..d00a5760a8d 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -100,7 +100,8 @@ public: UInt64 stage/* = QueryProcessingStage::Complete */, const Settings * settings/* = nullptr */, const ClientInfo * client_info/* = nullptr */, - bool with_pending_data/* = false */) override; + bool with_pending_data/* = false */, + std::function process_progress_callback) override; void sendCancel() override; diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index 665f98a88d6..954396af0fa 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -179,7 +179,7 @@ void HedgedConnections::sendQuery( modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset; } - replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data); + replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data, {}); replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout); replica.packet_receiver->setReceiveTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout); }; diff --git a/src/Client/IServerConnection.h b/src/Client/IServerConnection.h index 861630a942b..dfa4873e426 100644 --- a/src/Client/IServerConnection.h +++ b/src/Client/IServerConnection.h @@ -90,7 +90,8 @@ public: UInt64 stage, const Settings * settings, const ClientInfo * client_info, - bool with_pending_data) = 0; + bool with_pending_data, + std::function process_progress_callback) = 0; virtual void sendCancel() = 0; diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 49e40ef8571..77519423763 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -74,13 +74,14 @@ void LocalConnection::sendQuery( UInt64 stage, const Settings *, const ClientInfo *, - bool) + bool, + std::function process_progress_callback) { query_context = session.makeQueryContext(); query_context->setCurrentQueryId(query_id); if (send_progress) { - query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); }); + query_context->setProgressCallback([this] (const Progress & value) { this->updateProgress(value); }); query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); }); } if (!current_database.empty()) @@ -143,6 +144,19 @@ void LocalConnection::sendQuery( else if (state->io.pipeline.completed()) { CompletedPipelineExecutor executor(state->io.pipeline); + if (process_progress_callback) + { + auto callback = [this, &process_progress_callback]() + { + if (state->is_cancelled) + return true; + + process_progress_callback(state->progress.fetchAndResetPiecewiseAtomically()); + return false; + }; + + executor.setCancelCallback(callback, query_context->getSettingsRef().interactive_delay / 1000); + } executor.execute(); } @@ -185,6 +199,7 @@ void LocalConnection::sendData(const Block & block, const String &, bool) void LocalConnection::sendCancel() { + state->is_cancelled = true; if (state->executor) state->executor->cancel(); } @@ -440,7 +455,7 @@ Packet LocalConnection::receivePacket() } case Protocol::Server::Progress: { - packet.progress = std::move(state->progress); + packet.progress = state->progress.fetchAndResetPiecewiseAtomically(); state->progress.reset(); next_packet_type.reset(); break; diff --git a/src/Client/LocalConnection.h b/src/Client/LocalConnection.h index ad6f94122cc..1ad6ad73238 100644 --- a/src/Client/LocalConnection.h +++ b/src/Client/LocalConnection.h @@ -98,7 +98,8 @@ public: UInt64 stage/* = QueryProcessingStage::Complete */, const Settings * settings/* = nullptr */, const ClientInfo * client_info/* = nullptr */, - bool with_pending_data/* = false */) override; + bool with_pending_data/* = false */, + std::function process_progress_callback) override; void sendCancel() override; diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index 31fbc609bdc..b14ff9f2c8d 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -161,14 +161,14 @@ void MultiplexedConnections::sendQuery( modified_settings.parallel_replica_offset = i; replica_states[i].connection->sendQuery(timeouts, query, query_id, - stage, &modified_settings, &client_info, with_pending_data); + stage, &modified_settings, &client_info, with_pending_data, {}); } } else { /// Use single replica. replica_states[0].connection->sendQuery(timeouts, query, query_id, - stage, &modified_settings, &client_info, with_pending_data); + stage, &modified_settings, &client_info, with_pending_data, {}); } sent_query = true; diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index 84625a768bf..de09c07f4c1 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -132,7 +132,7 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query) { - connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false); + connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false, {}); while (true) { diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp index 7a8cff2d58f..ffc90807060 100644 --- a/src/Common/ProgressIndication.cpp +++ b/src/Common/ProgressIndication.cpp @@ -165,18 +165,17 @@ void ProgressIndication::writeProgress() message << '\r'; size_t prefix_size = message.count(); - size_t read_bytes = progress.read_raw_bytes ? progress.read_raw_bytes : progress.read_bytes; message << indicator << " Progress: "; message << formatReadableQuantity(progress.read_rows) << " rows, " - << formatReadableSizeWithDecimalSuffix(read_bytes); + << formatReadableSizeWithDecimalSuffix(progress.read_bytes); auto elapsed_ns = watch.elapsed(); if (elapsed_ns) message << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., " - << formatReadableSizeWithDecimalSuffix(read_bytes * 1000000000.0 / elapsed_ns) << "/s.) "; + << formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) "; else message << ". "; @@ -206,7 +205,7 @@ void ProgressIndication::writeProgress() int64_t remaining_space = static_cast(terminal_width) - written_progress_chars; /// If the approximate number of rows to process is known, we can display a progress bar and percentage. - if (progress.total_rows_to_read || progress.total_raw_bytes_to_read) + if (progress.total_rows_to_read || progress.total_bytes_to_read) { size_t current_count, max_count; if (progress.total_rows_to_read) @@ -216,8 +215,8 @@ void ProgressIndication::writeProgress() } else { - current_count = progress.read_raw_bytes; - max_count = std::max(progress.read_raw_bytes, progress.total_raw_bytes_to_read); + current_count = progress.read_bytes; + max_count = std::max(progress.read_bytes, progress.total_bytes_to_read); } /// To avoid flicker, display progress bar only if .5 seconds have passed since query execution start diff --git a/src/IO/Progress.cpp b/src/IO/Progress.cpp index 1d16f54de7b..29cf6c52d13 100644 --- a/src/IO/Progress.cpp +++ b/src/IO/Progress.cpp @@ -68,10 +68,12 @@ bool Progress::incrementPiecewiseAtomically(const Progress & rhs) { read_rows += rhs.read_rows; read_bytes += rhs.read_bytes; - read_raw_bytes += rhs.read_raw_bytes; - total_rows_to_read += rhs.total_rows_to_read; - total_raw_bytes_to_read += rhs.total_raw_bytes_to_read; + if (rhs.total_rows_to_read) + total_rows_to_read.store(rhs.total_rows_to_read); + + if (rhs.total_bytes_to_read) + total_bytes_to_read.store(rhs.total_bytes_to_read); written_rows += rhs.written_rows; written_bytes += rhs.written_bytes; @@ -83,10 +85,9 @@ void Progress::reset() { read_rows = 0; read_bytes = 0; - read_raw_bytes = 0; total_rows_to_read = 0; - total_raw_bytes_to_read = 0; + total_bytes_to_read = 0; written_rows = 0; written_bytes = 0; @@ -98,10 +99,9 @@ ProgressValues Progress::getValues() const res.read_rows = read_rows.load(std::memory_order_relaxed); res.read_bytes = read_bytes.load(std::memory_order_relaxed); - res.read_raw_bytes = read_raw_bytes.load(std::memory_order_relaxed); res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed); - res.total_raw_bytes_to_read = total_raw_bytes_to_read.load(std::memory_order_relaxed); + res.total_bytes_to_read = total_bytes_to_read.load(std::memory_order_relaxed); res.written_rows = written_rows.load(std::memory_order_relaxed); res.written_bytes = written_bytes.load(std::memory_order_relaxed); @@ -109,16 +109,31 @@ ProgressValues Progress::getValues() const return res; } -ProgressValues Progress::fetchAndResetPiecewiseAtomically() +ProgressValues Progress::fetchValuesAndResetPiecewiseAtomically() { ProgressValues res; res.read_rows = read_rows.fetch_and(0); res.read_bytes = read_bytes.fetch_and(0); - res.read_raw_bytes = read_raw_bytes.fetch_and(0); res.total_rows_to_read = total_rows_to_read.fetch_and(0); - res.total_raw_bytes_to_read = total_raw_bytes_to_read.fetch_and(0); + res.total_bytes_to_read = total_bytes_to_read.fetch_and(0); + + res.written_rows = written_rows.fetch_and(0); + res.written_bytes = written_bytes.fetch_and(0); + + return res; +} + +Progress Progress::fetchAndResetPiecewiseAtomically() +{ + Progress res; + + res.read_rows = read_rows.fetch_and(0); + res.read_bytes = read_bytes.fetch_and(0); + + res.total_rows_to_read = total_rows_to_read.fetch_and(0); + res.total_bytes_to_read = total_bytes_to_read.fetch_and(0); res.written_rows = written_rows.fetch_and(0); res.written_bytes = written_bytes.fetch_and(0); @@ -130,10 +145,9 @@ Progress & Progress::operator=(Progress && other) noexcept { read_rows = other.read_rows.load(std::memory_order_relaxed); read_bytes = other.read_bytes.load(std::memory_order_relaxed); - read_raw_bytes = other.read_raw_bytes.load(std::memory_order_relaxed); total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed); - total_raw_bytes_to_read = other.total_raw_bytes_to_read.load(std::memory_order_relaxed); + total_bytes_to_read = other.total_bytes_to_read.load(std::memory_order_relaxed); written_rows = other.written_rows.load(std::memory_order_relaxed); written_bytes = other.written_bytes.load(std::memory_order_relaxed); @@ -148,7 +162,10 @@ void Progress::read(ReadBuffer & in, UInt64 server_revision) read_rows.store(values.read_rows, std::memory_order_relaxed); read_bytes.store(values.read_bytes, std::memory_order_relaxed); + total_rows_to_read.store(values.total_rows_to_read, std::memory_order_relaxed); + total_bytes_to_read.store(values.total_bytes_to_read, std::memory_order_relaxed); + written_rows.store(values.written_rows, std::memory_order_relaxed); written_bytes.store(values.written_bytes, std::memory_order_relaxed); } diff --git a/src/IO/Progress.h b/src/IO/Progress.h index 4f1a3df0ffd..f04822f26bb 100644 --- a/src/IO/Progress.h +++ b/src/IO/Progress.h @@ -18,10 +18,9 @@ struct ProgressValues { size_t read_rows; size_t read_bytes; - size_t read_raw_bytes; size_t total_rows_to_read; - size_t total_raw_bytes_to_read; + size_t total_bytes_to_read; size_t written_rows; size_t written_bytes; @@ -68,15 +67,12 @@ struct Progress { std::atomic read_rows {0}; /// Rows (source) processed. std::atomic read_bytes {0}; /// Bytes (uncompressed, source) processed. - std::atomic read_raw_bytes {0}; /// Raw bytes processed. /** How much rows/bytes must be processed, in total, approximately. Non-zero value is sent when there is information about * some new part of job. Received values must be summed to get estimate of total rows to process. - * `total_raw_bytes_to_process` is used for file table engine or when reading from file descriptor. - * Used for rendering progress bar on client. */ std::atomic total_rows_to_read {0}; - std::atomic total_raw_bytes_to_read {0}; + std::atomic total_bytes_to_read {0}; std::atomic written_rows {0}; std::atomic written_bytes {0}; @@ -93,7 +89,7 @@ struct Progress : written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {} explicit Progress(FileProgress file_progress) - : read_raw_bytes(file_progress.read_bytes), total_raw_bytes_to_read(file_progress.total_bytes_to_read) {} + : read_bytes(file_progress.read_bytes), total_bytes_to_read(file_progress.total_bytes_to_read) {} void read(ReadBuffer & in, UInt64 server_revision); @@ -109,7 +105,9 @@ struct Progress ProgressValues getValues() const; - ProgressValues fetchAndResetPiecewiseAtomically(); + ProgressValues fetchValuesAndResetPiecewiseAtomically(); + + Progress fetchAndResetPiecewiseAtomically(); Progress & operator=(Progress && other) noexcept; diff --git a/src/Processors/Sources/SourceWithProgress.h b/src/Processors/Sources/SourceWithProgress.h index 912a548f977..57002006957 100644 --- a/src/Processors/Sources/SourceWithProgress.h +++ b/src/Processors/Sources/SourceWithProgress.h @@ -42,6 +42,7 @@ public: /// Set the approximate total number of rows to read. virtual void addTotalRowsApprox(size_t value) = 0; + virtual void setTotalRowsApprox(size_t value) = 0; }; /// Implementation for ISourceWithProgress @@ -58,6 +59,7 @@ public: void setProcessListElement(QueryStatus * elem) final; void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; } void addTotalRowsApprox(size_t value) final { total_rows_approx += value; } + void setTotalRowsApprox(size_t value) final { total_rows_approx = value; } protected: /// Call this method to provide information about progress. diff --git a/src/QueryPipeline/RemoteInserter.cpp b/src/QueryPipeline/RemoteInserter.cpp index aec7562e133..d5cef72b020 100644 --- a/src/QueryPipeline/RemoteInserter.cpp +++ b/src/QueryPipeline/RemoteInserter.cpp @@ -50,7 +50,7 @@ RemoteInserter::RemoteInserter( /** Send query and receive "header", that describes table structure. * Header is needed to know, what structure is required for blocks to be passed to 'write' method. */ - connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info, false); + connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info, false, {}); while (true) { diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index 99ee1b7f7f3..e60b85ee7a3 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1482,7 +1482,7 @@ namespace void Call::addProgressToResult() { - auto values = progress.fetchAndResetPiecewiseAtomically(); + auto values = progress.fetchValuesAndResetPiecewiseAtomically(); if (!values.read_rows && !values.read_bytes && !values.total_rows_to_read && !values.written_rows && !values.written_bytes) return; auto & grpc_progress = *result.mutable_progress(); diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index a038dcb3e6c..c71b3834726 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1329,6 +1329,7 @@ void TCPHandler::receiveQuery() query_context->getIgnoredPartUUIDs()->add(*state.part_uuids_to_ignore); query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); }); + query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); }); /// /// Settings @@ -1736,7 +1737,7 @@ void TCPHandler::updateProgress(const Progress & value) void TCPHandler::sendProgress() { writeVarUInt(Protocol::Server::Progress, *out); - auto increment = state.progress.fetchAndResetPiecewiseAtomically(); + auto increment = state.progress.fetchValuesAndResetPiecewiseAtomically(); increment.write(*out, client_tcp_protocol_version); out->next(); } diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index c460b8a4c67..e177a8ffd76 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -433,6 +433,8 @@ public: bool need_path_column = false; bool need_file_column = false; + + size_t total_bytes_to_read = 0; }; using FilesInfoPtr = std::shared_ptr; @@ -573,6 +575,15 @@ public: chunk.addColumn(column->convertToFullColumnIfConst()); } + if (num_rows) + { + auto bytes_per_row = std::ceil(static_cast(chunk.bytes()) / num_rows); + size_t total_rows_approx = std::ceil(static_cast(files_info->total_bytes_to_read) / bytes_per_row); + total_rows_approx_accumulated += total_rows_approx; + ++total_rows_count_times; + total_rows_approx = total_rows_approx_accumulated / total_rows_count_times; + setTotalRowsApprox(total_rows_approx); + } return chunk; } @@ -608,6 +619,9 @@ private: bool finished_generate = false; std::shared_lock shared_lock; + + UInt64 total_rows_approx_accumulated = 0; + size_t total_rows_count_times = 0; }; @@ -635,6 +649,7 @@ Pipe StorageFile::read( auto files_info = std::make_shared(); files_info->files = paths; + files_info->total_bytes_to_read = total_bytes_to_read; for (const auto & column : column_names) { @@ -654,9 +669,8 @@ Pipe StorageFile::read( /// Set total number of bytes to process. For progress bar. auto progress_callback = context->getFileProgressCallback(); - if ((context->getApplicationType() == Context::ApplicationType::LOCAL - || context->getApplicationType() == Context::ApplicationType::CLIENT) - && progress_callback) + + if (progress_callback) progress_callback(FileProgress(0, total_bytes_to_read)); for (size_t i = 0; i < num_streams; ++i)