Fix progress

This commit is contained in:
kssenii 2021-08-23 10:13:27 +03:00
parent deeb942ccb
commit 95645e8f26
4 changed files with 47 additions and 25 deletions

View File

@ -90,7 +90,10 @@ void LocalServer::processSingleQuery(const String & query_to_execute, ASTPtr par
/// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only
/// first exception or whether we need to even rethrow it because there is --ignore-error.
if (!ignore_error)
{
server_exception.reset();
client_exception.reset();
}
auto process_error = [&]()
{
@ -465,9 +468,7 @@ try
connection_parameters = ConnectionParameters(config());
connection = std::make_unique<LocalConnection>(global_context);
/// Use the same query_id (and thread group) for all queries
connect();
/// TODO: Use the same query_id (and thread group) for all queries
if (is_interactive)
{
@ -480,9 +481,12 @@ try
if (server_exception)
server_exception->rethrow();
if (client_exception)
client_exception->rethrow();
}
connection.reset();
global_context->shutdown();
global_context.reset();

View File

@ -193,7 +193,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
return;
if (need_render_progress)
if (need_render_progress && stdout_is_a_tty)
progress_indication.clearProgressOutput();
block_out_stream->write(block);
@ -203,7 +203,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
block_out_stream->flush();
/// Restore progress bar after data block.
if (need_render_progress)
if (need_render_progress && stdout_is_a_tty)
progress_indication.writeProgress();
}
@ -487,6 +487,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
/// Poll for changes after a cancellation check, otherwise it never reached
/// because of progress updates from server.
if (connection->poll(poll_interval))
break;
}
@ -692,6 +693,16 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
if (!parsed_insert_query)
return;
if (need_render_progress)
{
/// Set total_bytes_to_read for current fd.
FileProgress file_progress(0, std_in.size());
progress_indication.updateProgress(Progress(file_progress));
/// Set callback to be called on file progress.
progress_indication.setFileProgressCallback(global_context, true);
}
if (parsed_insert_query->infile)
{
const auto & in_file_node = parsed_insert_query->infile->as<ASTLiteral &>();
@ -731,22 +742,15 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
}
else if (!is_interactive)
{
if (need_render_progress)
{
/// Add callback to track reading from fd.
std_in.setProgressCallback(global_context);
}
/// Send data read from stdin.
try
{
if (need_render_progress)
{
/// Set total_bytes_to_read for current fd.
FileProgress file_progress(0, std_in.size());
progress_indication.updateProgress(Progress(file_progress));
/// Set callback to be called on file progress.
progress_indication.setFileProgressCallback(global_context, true);
/// Add callback to track reading from fd.
std_in.setProgressCallback(global_context);
}
sendDataFrom(std_in, sample, columns_description, parsed_query);
}
catch (Exception & e)
@ -1225,9 +1229,6 @@ int ClientBase::main(const std::vector<std::string> & /*args*/)
{
UseSSL use_ssl;
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
if (is_interactive)
{
clearTerminal();

View File

@ -105,6 +105,8 @@ void LocalConnection::sendQuery(
query_context = session.makeQueryContext();
query_context->makeSessionContext(); /// initial_create_query requires a session context to be set.
query_context->setCurrentQueryId("");
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
CurrentThread::QueryScope query_scope_holder(query_context);
state->after_send_progress.restart();
@ -113,21 +115,24 @@ void LocalConnection::sendQuery(
try
{
state->io = executeQuery(state->query, query_context, false, state->stage, true);
next_packet_type = Protocol::Server::Data;
if (state->io.out)
{
state->need_receive_data_for_insert = true;
state->io.out->writePrefix();
next_packet_type = Protocol::Server::Data;
state->block = state->io.out->getHeader();
}
else if (state->io.pipeline.initialized())
{
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
state->block = state->io.pipeline.getHeader();
}
else if (state->io.in)
{
state->async_in = std::make_unique<AsynchronousBlockInputStream>(state->io.in);
state->async_in->readPrefix();
state->block = state->io.in->getHeader();
}
}
catch (const Exception & e)
@ -217,7 +222,7 @@ bool LocalConnection::poll(size_t)
if (!state)
return false;
if (state->after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
if (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay)
{
state->after_send_progress.restart();
next_packet_type = Protocol::Server::Progress;
@ -289,6 +294,13 @@ bool LocalConnection::poll(size_t)
}
}
if (state->is_finished && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->is_finished)
{
finishQuery();
@ -366,8 +378,12 @@ Packet LocalConnection::receivePacket()
+ " from server " + getDescription(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
// if (state && state->query_execution_time.elapsed() > static_cast<Float64>(query_context->getSettingsRef().max_execution_time.totalMilliseconds()))
// state->is_finished = true;
if (state)
{
auto max_execution_time = query_context->getSettingsRef().max_execution_time.totalSeconds();
if (max_execution_time && state->query_execution_time.elapsedSeconds() > static_cast<Float64>(max_execution_time))
state->is_finished = true;
}
next_packet_type.reset();
return packet;

View File

@ -45,6 +45,7 @@ struct LocalQueryState
bool sent_totals = false;
bool sent_extremes = false;
bool sent_progress = false;
/// Request requires data from the client (INSERT, but not INSERT SELECT).
bool need_receive_data_for_insert = false;