Fix progress for insert select

This commit is contained in:
kssenii 2022-05-06 17:04:03 +02:00
parent 9022500b72
commit 0feda67ec4
18 changed files with 101 additions and 49 deletions

View File

@ -107,7 +107,7 @@ std::vector<String> Client::loadWarningMessages()
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings", "" /* query_id */,
QueryProcessingStage::Complete,
&global_context->getSettingsRef(),
&global_context->getClientInfo(), false);
&global_context->getClientInfo(), false, {});
while (true)
{
Packet packet = connection->receivePacket();

View File

@ -719,7 +719,8 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
query_processing_stage,
&global_context->getSettingsRef(),
&global_context->getClientInfo(),
true);
true,
[&](const Progress & progress) { onProgress(progress); });
if (send_external_tables)
sendExternalTables(parsed_query);
@ -1072,7 +1073,8 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
query_processing_stage,
&global_context->getSettingsRef(),
&global_context->getClientInfo(),
true);
true,
[&](const Progress & progress) { onProgress(progress); });
if (send_external_tables)
sendExternalTables(parsed_query);
@ -1104,7 +1106,9 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
if (!parsed_insert_query)
return;
if (need_render_progress)
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof();
if (need_render_progress && have_data_in_stdin)
{
/// Set total_bytes_to_read for current fd.
FileProgress file_progress(0, std_in.size());
@ -1114,8 +1118,6 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
progress_indication.setFileProgressCallback(global_context, true);
}
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof();
/// If data fetched from file (maybe compressed file)
if (parsed_insert_query->infile)
{

View File

@ -451,7 +451,8 @@ void Connection::sendQuery(
UInt64 stage,
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data)
bool with_pending_data,
std::function<void(const Progress &)>)
{
if (!connected)
connect(timeouts);

View File

@ -100,7 +100,8 @@ public:
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */) override;
bool with_pending_data/* = false */,
std::function<void(const Progress &)> process_progress_callback) override;
void sendCancel() override;

View File

@ -179,7 +179,7 @@ void HedgedConnections::sendQuery(
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset;
}
replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data);
replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
replica.packet_receiver->setReceiveTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout);
};

View File

@ -90,7 +90,8 @@ public:
UInt64 stage,
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data) = 0;
bool with_pending_data,
std::function<void(const Progress &)> process_progress_callback) = 0;
virtual void sendCancel() = 0;

View File

@ -74,13 +74,14 @@ void LocalConnection::sendQuery(
UInt64 stage,
const Settings *,
const ClientInfo *,
bool)
bool,
std::function<void(const Progress &)> process_progress_callback)
{
query_context = session.makeQueryContext();
query_context->setCurrentQueryId(query_id);
if (send_progress)
{
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
query_context->setProgressCallback([this] (const Progress & value) { this->updateProgress(value); });
query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); });
}
if (!current_database.empty())
@ -143,6 +144,19 @@ void LocalConnection::sendQuery(
else if (state->io.pipeline.completed())
{
CompletedPipelineExecutor executor(state->io.pipeline);
if (process_progress_callback)
{
auto callback = [this, &process_progress_callback]()
{
if (state->is_cancelled)
return true;
process_progress_callback(state->progress.fetchAndResetPiecewiseAtomically());
return false;
};
executor.setCancelCallback(callback, query_context->getSettingsRef().interactive_delay / 1000);
}
executor.execute();
}
@ -185,6 +199,7 @@ void LocalConnection::sendData(const Block & block, const String &, bool)
void LocalConnection::sendCancel()
{
state->is_cancelled = true;
if (state->executor)
state->executor->cancel();
}
@ -440,7 +455,7 @@ Packet LocalConnection::receivePacket()
}
case Protocol::Server::Progress:
{
packet.progress = std::move(state->progress);
packet.progress = state->progress.fetchAndResetPiecewiseAtomically();
state->progress.reset();
next_packet_type.reset();
break;

View File

@ -98,7 +98,8 @@ public:
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */) override;
bool with_pending_data/* = false */,
std::function<void(const Progress &)> process_progress_callback) override;
void sendCancel() override;

View File

@ -161,14 +161,14 @@ void MultiplexedConnections::sendQuery(
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, &client_info, with_pending_data);
stage, &modified_settings, &client_info, with_pending_data, {});
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, &client_info, with_pending_data);
stage, &modified_settings, &client_info, with_pending_data, {});
}
sent_query = true;

View File

@ -132,7 +132,7 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false);
connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false, {});
while (true)
{

View File

@ -165,18 +165,17 @@ void ProgressIndication::writeProgress()
message << '\r';
size_t prefix_size = message.count();
size_t read_bytes = progress.read_raw_bytes ? progress.read_raw_bytes : progress.read_bytes;
message << indicator << " Progress: ";
message
<< formatReadableQuantity(progress.read_rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(read_bytes);
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
auto elapsed_ns = watch.elapsed();
if (elapsed_ns)
message << " ("
<< formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
else
message << ". ";
@ -206,7 +205,7 @@ void ProgressIndication::writeProgress()
int64_t remaining_space = static_cast<int64_t>(terminal_width) - written_progress_chars;
/// If the approximate number of rows to process is known, we can display a progress bar and percentage.
if (progress.total_rows_to_read || progress.total_raw_bytes_to_read)
if (progress.total_rows_to_read || progress.total_bytes_to_read)
{
size_t current_count, max_count;
if (progress.total_rows_to_read)
@ -216,8 +215,8 @@ void ProgressIndication::writeProgress()
}
else
{
current_count = progress.read_raw_bytes;
max_count = std::max(progress.read_raw_bytes, progress.total_raw_bytes_to_read);
current_count = progress.read_bytes;
max_count = std::max(progress.read_bytes, progress.total_bytes_to_read);
}
/// To avoid flicker, display progress bar only if .5 seconds have passed since query execution start

View File

@ -68,10 +68,12 @@ bool Progress::incrementPiecewiseAtomically(const Progress & rhs)
{
read_rows += rhs.read_rows;
read_bytes += rhs.read_bytes;
read_raw_bytes += rhs.read_raw_bytes;
total_rows_to_read += rhs.total_rows_to_read;
total_raw_bytes_to_read += rhs.total_raw_bytes_to_read;
if (rhs.total_rows_to_read)
total_rows_to_read.store(rhs.total_rows_to_read);
if (rhs.total_bytes_to_read)
total_bytes_to_read.store(rhs.total_bytes_to_read);
written_rows += rhs.written_rows;
written_bytes += rhs.written_bytes;
@ -83,10 +85,9 @@ void Progress::reset()
{
read_rows = 0;
read_bytes = 0;
read_raw_bytes = 0;
total_rows_to_read = 0;
total_raw_bytes_to_read = 0;
total_bytes_to_read = 0;
written_rows = 0;
written_bytes = 0;
@ -98,10 +99,9 @@ ProgressValues Progress::getValues() const
res.read_rows = read_rows.load(std::memory_order_relaxed);
res.read_bytes = read_bytes.load(std::memory_order_relaxed);
res.read_raw_bytes = read_raw_bytes.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed);
res.total_raw_bytes_to_read = total_raw_bytes_to_read.load(std::memory_order_relaxed);
res.total_bytes_to_read = total_bytes_to_read.load(std::memory_order_relaxed);
res.written_rows = written_rows.load(std::memory_order_relaxed);
res.written_bytes = written_bytes.load(std::memory_order_relaxed);
@ -109,16 +109,31 @@ ProgressValues Progress::getValues() const
return res;
}
ProgressValues Progress::fetchAndResetPiecewiseAtomically()
ProgressValues Progress::fetchValuesAndResetPiecewiseAtomically()
{
ProgressValues res;
res.read_rows = read_rows.fetch_and(0);
res.read_bytes = read_bytes.fetch_and(0);
res.read_raw_bytes = read_raw_bytes.fetch_and(0);
res.total_rows_to_read = total_rows_to_read.fetch_and(0);
res.total_raw_bytes_to_read = total_raw_bytes_to_read.fetch_and(0);
res.total_bytes_to_read = total_bytes_to_read.fetch_and(0);
res.written_rows = written_rows.fetch_and(0);
res.written_bytes = written_bytes.fetch_and(0);
return res;
}
Progress Progress::fetchAndResetPiecewiseAtomically()
{
Progress res;
res.read_rows = read_rows.fetch_and(0);
res.read_bytes = read_bytes.fetch_and(0);
res.total_rows_to_read = total_rows_to_read.fetch_and(0);
res.total_bytes_to_read = total_bytes_to_read.fetch_and(0);
res.written_rows = written_rows.fetch_and(0);
res.written_bytes = written_bytes.fetch_and(0);
@ -130,10 +145,9 @@ Progress & Progress::operator=(Progress && other) noexcept
{
read_rows = other.read_rows.load(std::memory_order_relaxed);
read_bytes = other.read_bytes.load(std::memory_order_relaxed);
read_raw_bytes = other.read_raw_bytes.load(std::memory_order_relaxed);
total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
total_raw_bytes_to_read = other.total_raw_bytes_to_read.load(std::memory_order_relaxed);
total_bytes_to_read = other.total_bytes_to_read.load(std::memory_order_relaxed);
written_rows = other.written_rows.load(std::memory_order_relaxed);
written_bytes = other.written_bytes.load(std::memory_order_relaxed);
@ -148,7 +162,10 @@ void Progress::read(ReadBuffer & in, UInt64 server_revision)
read_rows.store(values.read_rows, std::memory_order_relaxed);
read_bytes.store(values.read_bytes, std::memory_order_relaxed);
total_rows_to_read.store(values.total_rows_to_read, std::memory_order_relaxed);
total_bytes_to_read.store(values.total_bytes_to_read, std::memory_order_relaxed);
written_rows.store(values.written_rows, std::memory_order_relaxed);
written_bytes.store(values.written_bytes, std::memory_order_relaxed);
}

View File

@ -18,10 +18,9 @@ struct ProgressValues
{
size_t read_rows;
size_t read_bytes;
size_t read_raw_bytes;
size_t total_rows_to_read;
size_t total_raw_bytes_to_read;
size_t total_bytes_to_read;
size_t written_rows;
size_t written_bytes;
@ -68,15 +67,12 @@ struct Progress
{
std::atomic<size_t> read_rows {0}; /// Rows (source) processed.
std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed.
std::atomic<size_t> read_raw_bytes {0}; /// Raw bytes processed.
/** How much rows/bytes must be processed, in total, approximately. Non-zero value is sent when there is information about
* some new part of job. Received values must be summed to get estimate of total rows to process.
* `total_raw_bytes_to_process` is used for file table engine or when reading from file descriptor.
* Used for rendering progress bar on client.
*/
std::atomic<size_t> total_rows_to_read {0};
std::atomic<size_t> total_raw_bytes_to_read {0};
std::atomic<size_t> total_bytes_to_read {0};
std::atomic<size_t> written_rows {0};
std::atomic<size_t> written_bytes {0};
@ -93,7 +89,7 @@ struct Progress
: written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {}
explicit Progress(FileProgress file_progress)
: read_raw_bytes(file_progress.read_bytes), total_raw_bytes_to_read(file_progress.total_bytes_to_read) {}
: read_bytes(file_progress.read_bytes), total_bytes_to_read(file_progress.total_bytes_to_read) {}
void read(ReadBuffer & in, UInt64 server_revision);
@ -109,7 +105,9 @@ struct Progress
ProgressValues getValues() const;
ProgressValues fetchAndResetPiecewiseAtomically();
ProgressValues fetchValuesAndResetPiecewiseAtomically();
Progress fetchAndResetPiecewiseAtomically();
Progress & operator=(Progress && other) noexcept;

View File

@ -42,6 +42,7 @@ public:
/// Set the approximate total number of rows to read.
virtual void addTotalRowsApprox(size_t value) = 0;
virtual void setTotalRowsApprox(size_t value) = 0;
};
/// Implementation for ISourceWithProgress
@ -58,6 +59,7 @@ public:
void setProcessListElement(QueryStatus * elem) final;
void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
void addTotalRowsApprox(size_t value) final { total_rows_approx += value; }
void setTotalRowsApprox(size_t value) final { total_rows_approx = value; }
protected:
/// Call this method to provide information about progress.

View File

@ -50,7 +50,7 @@ RemoteInserter::RemoteInserter(
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info, false);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info, false, {});
while (true)
{

View File

@ -1482,7 +1482,7 @@ namespace
void Call::addProgressToResult()
{
auto values = progress.fetchAndResetPiecewiseAtomically();
auto values = progress.fetchValuesAndResetPiecewiseAtomically();
if (!values.read_rows && !values.read_bytes && !values.total_rows_to_read && !values.written_rows && !values.written_bytes)
return;
auto & grpc_progress = *result.mutable_progress();

View File

@ -1329,6 +1329,7 @@ void TCPHandler::receiveQuery()
query_context->getIgnoredPartUUIDs()->add(*state.part_uuids_to_ignore);
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); });
///
/// Settings
@ -1736,7 +1737,7 @@ void TCPHandler::updateProgress(const Progress & value)
void TCPHandler::sendProgress()
{
writeVarUInt(Protocol::Server::Progress, *out);
auto increment = state.progress.fetchAndResetPiecewiseAtomically();
auto increment = state.progress.fetchValuesAndResetPiecewiseAtomically();
increment.write(*out, client_tcp_protocol_version);
out->next();
}

View File

@ -433,6 +433,8 @@ public:
bool need_path_column = false;
bool need_file_column = false;
size_t total_bytes_to_read = 0;
};
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
@ -573,6 +575,15 @@ public:
chunk.addColumn(column->convertToFullColumnIfConst());
}
if (num_rows)
{
auto bytes_per_row = std::ceil(static_cast<double>(chunk.bytes()) / num_rows);
size_t total_rows_approx = std::ceil(static_cast<double>(files_info->total_bytes_to_read) / bytes_per_row);
total_rows_approx_accumulated += total_rows_approx;
++total_rows_count_times;
total_rows_approx = total_rows_approx_accumulated / total_rows_count_times;
setTotalRowsApprox(total_rows_approx);
}
return chunk;
}
@ -608,6 +619,9 @@ private:
bool finished_generate = false;
std::shared_lock<std::shared_timed_mutex> shared_lock;
UInt64 total_rows_approx_accumulated = 0;
size_t total_rows_count_times = 0;
};
@ -635,6 +649,7 @@ Pipe StorageFile::read(
auto files_info = std::make_shared<StorageFileSource::FilesInfo>();
files_info->files = paths;
files_info->total_bytes_to_read = total_bytes_to_read;
for (const auto & column : column_names)
{
@ -654,9 +669,8 @@ Pipe StorageFile::read(
/// Set total number of bytes to process. For progress bar.
auto progress_callback = context->getFileProgressCallback();
if ((context->getApplicationType() == Context::ApplicationType::LOCAL
|| context->getApplicationType() == Context::ApplicationType::CLIENT)
&& progress_callback)
if (progress_callback)
progress_callback(FileProgress(0, total_bytes_to_read));
for (size_t i = 0; i < num_streams; ++i)