Merge pull request #5116 from PerformanceVision/send_header

Send a X-ClickHouse summary on the header for HTTP client with number of rows inserted
This commit is contained in:
alexey-milovidov 2019-05-25 20:20:35 +03:00 committed by GitHub
commit 461c4919d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 234 additions and 130 deletions

View File

@ -325,8 +325,8 @@ private:
double seconds = watch.elapsedSeconds();
std::lock_guard lock(mutex);
info_per_interval.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
info_total.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
info_per_interval.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
info_total.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
}

View File

@ -866,7 +866,7 @@ private:
std::cout << std::endl
<< processed_rows << " rows in set. Elapsed: " << watch.elapsedSeconds() << " sec. ";
if (progress.rows >= 1000)
if (progress.read_rows >= 1000)
writeFinalProgress();
std::cout << std::endl << std::endl;
@ -1420,23 +1420,23 @@ private:
<< " Progress: ";
message
<< formatReadableQuantity(progress.rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.bytes);
<< formatReadableQuantity(progress.read_rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
size_t elapsed_ns = watch.elapsed();
if (elapsed_ns)
message << " ("
<< formatReadableQuantity(progress.rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
<< formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
else
message << ". ";
written_progress_chars = message.count() - prefix_size - (increment % 8 == 7 ? 10 : 13); /// Don't count invisible output (escape sequences).
/// If the approximate number of rows to process is known, we can display a progress bar and percentage.
if (progress.total_rows > 0)
if (progress.total_rows_to_read > 0)
{
size_t total_rows_corrected = std::max(progress.rows, progress.total_rows);
size_t total_rows_corrected = std::max(progress.read_rows, progress.total_rows_to_read);
/// To avoid flicker, display progress bar only if .5 seconds have passed since query execution start
/// and the query is less than halfway done.
@ -1444,7 +1444,7 @@ private:
if (elapsed_ns > 500000000)
{
/// Trigger to start displaying progress bar. If query is mostly done, don't display it.
if (progress.rows * 2 < total_rows_corrected)
if (progress.read_rows * 2 < total_rows_corrected)
show_progress_bar = true;
if (show_progress_bar)
@ -1452,7 +1452,7 @@ private:
ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_size.ws_col) - written_progress_chars - strlen(" 99%");
if (width_of_progress_bar > 0)
{
std::string bar = UnicodeBar::render(UnicodeBar::getWidth(progress.rows, 0, total_rows_corrected, width_of_progress_bar));
std::string bar = UnicodeBar::render(UnicodeBar::getWidth(progress.read_rows, 0, total_rows_corrected, width_of_progress_bar));
message << "\033[0;32m" << bar << "\033[0m";
if (width_of_progress_bar > static_cast<ssize_t>(bar.size() / UNICODE_BAR_CHAR_SIZE))
message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' ');
@ -1461,7 +1461,7 @@ private:
}
/// Underestimate percentage a bit to avoid displaying 100%.
message << ' ' << (99 * progress.rows / total_rows_corrected) << '%';
message << ' ' << (99 * progress.read_rows / total_rows_corrected) << '%';
}
message << ENABLE_LINE_WRAPPING;
@ -1474,14 +1474,14 @@ private:
void writeFinalProgress()
{
std::cout << "Processed "
<< formatReadableQuantity(progress.rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.bytes);
<< formatReadableQuantity(progress.read_rows) << " rows, "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
size_t elapsed_ns = watch.elapsed();
if (elapsed_ns)
std::cout << " ("
<< formatReadableQuantity(progress.rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
<< formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
else
std::cout << ". ";
}

View File

@ -13,7 +13,7 @@ void checkFulfilledConditionsAndUpdate(
TestStats & statistics, TestStopConditions & stop_conditions,
InterruptListener & interrupt_listener)
{
statistics.add(progress.rows, progress.bytes);
statistics.add(progress.read_rows, progress.read_bytes);
stop_conditions.reportRowsRead(statistics.total_rows_read);
stop_conditions.reportBytesReadUncompressed(statistics.total_bytes_read);

View File

@ -56,6 +56,8 @@
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54421
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226

View File

@ -19,8 +19,8 @@ void CountingBlockOutputStream::write(const Block & block)
Progress local_progress(block.rows(), block.bytes(), 0);
progress.incrementPiecewiseAtomically(local_progress);
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.rows);
ProfileEvents::increment(ProfileEvents::InsertedBytes, local_progress.bytes);
ProfileEvents::increment(ProfileEvents::InsertedRows, local_progress.read_rows);
ProfileEvents::increment(ProfileEvents::InsertedBytes, local_progress.read_bytes);
if (process_elem)
process_elem->updateProgressOut(local_progress);

View File

@ -281,7 +281,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.rows, progress.total_rows);
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
@ -289,7 +289,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
if (limits.mode == LIMITS_TOTAL
&& ((limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes)))
|| (limits.size_limits.max_bytes && progress.read_bytes > limits.size_limits.max_bytes)))
{
switch (limits.size_limits.overflow_mode)
{
@ -300,7 +300,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
+ " rows read (or to read), maximum: " + toString(limits.size_limits.max_rows),
ErrorCodes::TOO_MANY_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.bytes)
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.read_bytes)
+ " bytes read, maximum: " + toString(limits.size_limits.max_bytes),
ErrorCodes::TOO_MANY_BYTES);
}
@ -308,8 +308,8 @@ void IBlockInputStream::progressImpl(const Progress & value)
case OverflowMode::BREAK:
{
/// For `break`, we will stop only if so many rows were actually read, and not just supposed to be read.
if ((limits.size_limits.max_rows && progress.rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes))
if ((limits.size_limits.max_rows && progress.read_rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.read_bytes > limits.size_limits.max_bytes))
{
cancel(false);
}
@ -322,7 +322,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
}
}
size_t total_rows = progress.total_rows;
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
@ -344,20 +344,20 @@ void IBlockInputStream::progressImpl(const Progress & value)
if (elapsed_seconds > 0)
{
if (limits.min_execution_speed && progress.rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / elapsed_seconds)
if (limits.min_execution_speed && progress.read_rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.read_rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
if (limits.min_execution_speed_bytes && progress.bytes / elapsed_seconds < limits.min_execution_speed_bytes)
throw Exception("Query is executing too slow: " + toString(progress.bytes / elapsed_seconds)
if (limits.min_execution_speed_bytes && progress.read_bytes / elapsed_seconds < limits.min_execution_speed_bytes)
throw Exception("Query is executing too slow: " + toString(progress.read_bytes / elapsed_seconds)
+ " bytes/sec., minimum: " + toString(limits.min_execution_speed_bytes),
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.rows);
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.read_rows);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
@ -366,17 +366,17 @@ void IBlockInputStream::progressImpl(const Progress & value)
ErrorCodes::TOO_SLOW);
}
if (limits.max_execution_speed && progress.rows / elapsed_seconds >= limits.max_execution_speed)
limitProgressingSpeed(progress.rows, limits.max_execution_speed, total_elapsed_microseconds);
if (limits.max_execution_speed && progress.read_rows / elapsed_seconds >= limits.max_execution_speed)
limitProgressingSpeed(progress.read_rows, limits.max_execution_speed, total_elapsed_microseconds);
if (limits.max_execution_speed_bytes && progress.bytes / elapsed_seconds >= limits.max_execution_speed_bytes)
limitProgressingSpeed(progress.bytes, limits.max_execution_speed_bytes, total_elapsed_microseconds);
if (limits.max_execution_speed_bytes && progress.read_bytes / elapsed_seconds >= limits.max_execution_speed_bytes)
limitProgressingSpeed(progress.read_bytes, limits.max_execution_speed_bytes, total_elapsed_microseconds);
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(nullptr), value.rows, value.bytes);
quota->checkAndAddReadRowsBytes(time(nullptr), value.read_rows, value.read_bytes);
}
}
}

View File

@ -220,10 +220,10 @@ void JSONRowOutputStream::writeStatistics()
writeText(watch.elapsedSeconds(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"rows_read\": ", *ostr);
writeText(progress.rows.load(), *ostr);
writeText(progress.read_rows.load(), *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\"bytes_read\": ", *ostr);
writeText(progress.bytes.load(), *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);

View File

@ -215,10 +215,10 @@ void XMLRowOutputStream::writeStatistics()
writeText(watch.elapsedSeconds(), *ostr);
writeCString("</elapsed>\n", *ostr);
writeCString("\t\t<rows_read>", *ostr);
writeText(progress.rows.load(), *ostr);
writeText(progress.read_rows.load(), *ostr);
writeCString("</rows_read>\n", *ostr);
writeCString("\t\t<bytes_read>", *ostr);
writeText(progress.bytes.load(), *ostr);
writeText(progress.read_bytes.load(), *ostr);
writeCString("</bytes_read>\n", *ostr);
writeCString("\t</statistics>\n", *ostr);
}

View File

@ -8,63 +8,78 @@
namespace DB
{
void ProgressValues::read(ReadBuffer & in, UInt64 /*server_revision*/)
void ProgressValues::read(ReadBuffer & in, UInt64 server_revision)
{
size_t new_rows = 0;
size_t new_bytes = 0;
size_t new_total_rows = 0;
size_t new_read_rows = 0;
size_t new_read_bytes = 0;
size_t new_total_rows_to_read = 0;
size_t new_written_rows = 0;
size_t new_written_bytes = 0;
readVarUInt(new_rows, in);
readVarUInt(new_bytes, in);
readVarUInt(new_total_rows, in);
readVarUInt(new_read_rows, in);
readVarUInt(new_read_bytes, in);
readVarUInt(new_total_rows_to_read, in);
if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
readVarUInt(new_written_rows, in);
readVarUInt(new_written_bytes, in);
}
rows = new_rows;
bytes = new_bytes;
total_rows = new_total_rows;
this->read_rows = new_read_rows;
this->read_bytes = new_read_bytes;
this->total_rows_to_read = new_total_rows_to_read;
this->written_rows = new_written_rows;
this->written_bytes = new_written_bytes;
}
void ProgressValues::write(WriteBuffer & out, UInt64 /*client_revision*/) const
void ProgressValues::write(WriteBuffer & out, UInt64 client_revision) const
{
writeVarUInt(rows, out);
writeVarUInt(bytes, out);
writeVarUInt(total_rows, out);
writeVarUInt(this->read_rows, out);
writeVarUInt(this->read_bytes, out);
writeVarUInt(this->total_rows_to_read, out);
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
writeVarUInt(this->written_rows, out);
writeVarUInt(this->written_bytes, out);
}
}
void ProgressValues::writeJSON(WriteBuffer & out) const
{
/// Numbers are written in double quotes (as strings) to avoid loss of precision
/// of 64-bit integers after interpretation by JavaScript.
writeCString("{\"read_rows\":\"", out);
writeText(rows, out);
writeText(this->read_rows, out);
writeCString("\",\"read_bytes\":\"", out);
writeText(bytes, out);
writeCString("\",\"total_rows\":\"", out);
writeText(total_rows, out);
writeText(this->read_bytes, out);
writeCString("\",\"written_rows\":\"", out);
writeText(this->written_rows, out);
writeCString("\",\"written_bytes\":\"", out);
writeText(this->written_bytes, out);
writeCString("\",\"total_rows_to_read\":\"", out);
writeText(this->total_rows_to_read, out);
writeCString("\"}", out);
}
void Progress::read(ReadBuffer & in, UInt64 server_revision)
{
ProgressValues values;
values.read(in, server_revision);
rows.store(values.rows, std::memory_order_relaxed);
bytes.store(values.bytes, std::memory_order_relaxed);
total_rows.store(values.total_rows, std::memory_order_relaxed);
read_rows.store(values.read_rows, std::memory_order_relaxed);
read_bytes.store(values.read_bytes, std::memory_order_relaxed);
total_rows_to_read.store(values.total_rows_to_read, std::memory_order_relaxed);
written_rows.store(values.written_rows, std::memory_order_relaxed);
written_bytes.store(values.written_bytes, std::memory_order_relaxed);
}
void Progress::write(WriteBuffer & out, UInt64 client_revision) const
{
getValues().write(out, client_revision);
}
void Progress::writeJSON(WriteBuffer & out) const
{
getValues().writeJSON(out);

View File

@ -6,26 +6,44 @@
#include <Core/Defines.h>
namespace DB
{
class ReadBuffer;
class WriteBuffer;
/// See Progress.
struct ProgressValues
{
size_t rows;
size_t bytes;
size_t total_rows;
size_t read_rows;
size_t read_bytes;
size_t total_rows_to_read;
size_t written_rows;
size_t written_bytes;
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
void writeJSON(WriteBuffer & out) const;
};
struct ReadProgress
{
size_t read_rows;
size_t read_bytes;
size_t total_rows_to_read;
ReadProgress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
};
struct WriteProgress
{
size_t written_rows;
size_t written_bytes;
WriteProgress(size_t written_rows_, size_t written_bytes_)
: written_rows(written_rows_), written_bytes(written_bytes_) {}
};
/** Progress of query execution.
* Values, transferred over network are deltas - how much was done after previously sent value.
@ -33,49 +51,62 @@ struct ProgressValues
*/
struct Progress
{
std::atomic<size_t> rows {0}; /// Rows (source) processed.
std::atomic<size_t> bytes {0}; /// Bytes (uncompressed, source) processed.
std::atomic<size_t> read_rows {0}; /// Rows (source) processed.
std::atomic<size_t> read_bytes {0}; /// Bytes (uncompressed, source) processed.
/** How much rows must be processed, in total, approximately. Non-zero value is sent when there is information about some new part of job.
* Received values must be summed to get estimate of total rows to process.
* Used for rendering progress bar on client.
*/
std::atomic<size_t> total_rows {0};
std::atomic<size_t> total_rows_to_read {0};
std::atomic<size_t> written_rows {0};
std::atomic<size_t> written_bytes {0};
Progress() {}
Progress(size_t rows_, size_t bytes_, size_t total_rows_ = 0)
: rows(rows_), bytes(bytes_), total_rows(total_rows_) {}
Progress(size_t read_rows_, size_t read_bytes_, size_t total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
Progress(ReadProgress read_progress)
: read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {}
Progress(WriteProgress write_progress)
: written_rows(write_progress.written_rows), written_bytes(write_progress.written_bytes) {}
void read(ReadBuffer & in, UInt64 server_revision);
void write(WriteBuffer & out, UInt64 client_revision) const;
/// Progress in JSON format (single line, without whitespaces) is used in HTTP headers.
void writeJSON(WriteBuffer & out) const;
/// Each value separately is changed atomically (but not whole object).
bool incrementPiecewiseAtomically(const Progress & rhs)
{
rows += rhs.rows;
bytes += rhs.bytes;
total_rows += rhs.total_rows;
read_rows += rhs.read_rows;
read_bytes += rhs.read_bytes;
total_rows_to_read += rhs.total_rows_to_read;
written_rows += rhs.written_rows;
written_bytes += rhs.written_bytes;
return rhs.rows ? true : false;
return rhs.read_rows || rhs.written_rows ? true : false;
}
void reset()
{
rows = 0;
bytes = 0;
total_rows = 0;
read_rows = 0;
read_bytes = 0;
total_rows_to_read = 0;
written_rows = 0;
written_bytes = 0;
}
ProgressValues getValues() const
{
ProgressValues res;
res.rows = rows.load(std::memory_order_relaxed);
res.bytes = bytes.load(std::memory_order_relaxed);
res.total_rows = total_rows.load(std::memory_order_relaxed);
res.read_rows = read_rows.load(std::memory_order_relaxed);
res.read_bytes = read_bytes.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read.load(std::memory_order_relaxed);
res.written_rows = written_rows.load(std::memory_order_relaxed);
res.written_bytes = written_bytes.load(std::memory_order_relaxed);
return res;
}
@ -84,18 +115,22 @@ struct Progress
{
ProgressValues res;
res.rows = rows.fetch_and(0);
res.bytes = bytes.fetch_and(0);
res.total_rows = total_rows.fetch_and(0);
res.read_rows = read_rows.fetch_and(0);
res.read_bytes = read_bytes.fetch_and(0);
res.total_rows_to_read = total_rows_to_read.fetch_and(0);
res.written_rows = written_rows.fetch_and(0);
res.written_bytes = written_bytes.fetch_and(0);
return res;
}
Progress & operator=(Progress && other)
{
rows = other.rows.load(std::memory_order_relaxed);
bytes = other.bytes.load(std::memory_order_relaxed);
total_rows = other.total_rows.load(std::memory_order_relaxed);
read_rows = other.read_rows.load(std::memory_order_relaxed);
read_bytes = other.read_bytes.load(std::memory_order_relaxed);
total_rows_to_read = other.total_rows_to_read.load(std::memory_order_relaxed);
written_rows = other.written_rows.load(std::memory_order_relaxed);
written_bytes = other.written_bytes.load(std::memory_order_relaxed);
return *this;
}
@ -106,5 +141,4 @@ struct Progress
}
};
}

View File

@ -36,11 +36,39 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
}
}
void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
{
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending)
return;
WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer);
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
}
void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
{
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending)
return;
WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer);
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
}
void WriteBufferFromHTTPServerResponse::finishSendHeaders()
{
if (!headers_finished_sending)
{
writeHeaderSummary();
headers_finished_sending = true;
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
@ -174,13 +202,7 @@ void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
/// Send all common headers before our special progress headers.
startSendHeaders();
WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer);
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
writeHeaderProgress();
}
}

View File

@ -80,6 +80,11 @@ private:
/// but not finish them with \r\n, allowing to send more headers subsequently.
void startSendHeaders();
// Used for write the header X-ClickHouse-Progress
void writeHeaderProgress();
// Used for write the header X-ClickHouse-Summary
void writeHeaderSummary();
/// This method finish headers with \r\n, allowing to start to send body.
void finishSendHeaders();

View File

@ -400,11 +400,13 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
res.client_info = client_info;
res.elapsed_seconds = watch.elapsedSeconds();
res.is_cancelled = is_killed.load(std::memory_order_relaxed);
res.read_rows = progress_in.rows;
res.read_bytes = progress_in.bytes;
res.total_rows = progress_in.total_rows;
res.written_rows = progress_out.rows;
res.written_bytes = progress_out.bytes;
res.read_rows = progress_in.read_rows;
res.read_bytes = progress_in.read_bytes;
res.total_rows = progress_in.total_rows_to_read;
/// TODO: Use written_rows and written_bytes when real time progress is implemented
res.written_rows = progress_out.read_rows;
res.written_bytes = progress_out.read_bytes;
if (thread_group)
{

View File

@ -154,10 +154,12 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
elem.query_start_time = query_start_time;
elem.query_duration_ms = (getCurrentTimeNanoseconds() - query_start_time_nanoseconds) / 1000000U;
elem.read_rows = progress_in.rows.load(std::memory_order_relaxed);
elem.read_bytes = progress_in.bytes.load(std::memory_order_relaxed);
elem.written_rows = progress_out.rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.bytes.load(std::memory_order_relaxed);
elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
/// TODO: Use written_rows and written_bytes when run time progress is implemented
elem.written_rows = progress_out.read_rows.load(std::memory_order_relaxed);
elem.written_bytes = progress_out.read_bytes.load(std::memory_order_relaxed);
elem.memory_usage = memory_tracker.get();
elem.peak_memory_usage = memory_tracker.getPeak();

View File

@ -316,6 +316,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.written_rows = info.written_rows;
elem.written_bytes = info.written_bytes;
auto progress_callback = context.getProgressCallback();
if (progress_callback)
progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes)));
elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
if (stream_in)
@ -331,8 +336,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
{
/// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
elem.result_rows = counting_stream->getProgress().rows;
elem.result_bytes = counting_stream->getProgress().bytes;
elem.result_rows = counting_stream->getProgress().read_rows;
elem.result_bytes = counting_stream->getProgress().read_bytes;
}
}

View File

@ -494,17 +494,17 @@ public:
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
updateWatch();
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->bytes_read_uncompressed += value.read_bytes;
if (stage.is_first)
merge_entry->rows_read += value.rows;
merge_entry->rows_read += value.read_rows;
stage.total_rows += value.total_rows;
stage.rows_read += value.rows;
stage.total_rows += value.total_rows_to_read;
stage.rows_read += value.read_rows;
if (stage.total_rows > 0)
{
merge_entry->progress.store(

View File

@ -1,17 +1,19 @@
< X-ClickHouse-Progress: {"read_rows":"0","read_bytes":"0","total_rows":"10"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","total_rows":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","total_rows":"10"}
< X-ClickHouse-Progress: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
9
< X-ClickHouse-Progress: {"read_rows":"1","read_bytes":"8","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"2","read_bytes":"16","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"3","read_bytes":"24","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"4","read_bytes":"32","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"6","read_bytes":"48","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"7","read_bytes":"56","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"8","read_bytes":"64","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"9","read_bytes":"72","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"2","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"3","read_bytes":"24","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"4","read_bytes":"32","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"6","read_bytes":"48","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"7","read_bytes":"56","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"8","read_bytes":"64","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"9","read_bytes":"72","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
0
1
2
@ -32,3 +34,4 @@
7
8
9
< X-ClickHouse-Summary: {"read_rows":"10","read_bytes":"80","written_rows":"10","written_bytes":"40","total_rows_to_read":"0"}

View File

@ -17,3 +17,17 @@ ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=1&send_progress_in_htt
# nothing in body = no gzip
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 0' 2>&1 | grep -q 'Content-Encoding: gzip' && echo 'Fail' || true
# test insertion stats
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}" -H 'Accept-Encoding: gzip' -d 'DROP TABLE IF EXISTS insert_number_query' > /dev/null 2>&1
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}" -H 'Accept-Encoding: gzip' -d 'DROP TABLE IF EXISTS insert_number_query_2' > /dev/null 2>&1
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}" -H 'Accept-Encoding: gzip' -d 'CREATE TABLE insert_number_query (record UInt32) Engine = Memory' > /dev/null 2>&1
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}" -H 'Accept-Encoding: gzip' -d 'CREATE TABLE insert_number_query_2 (record UInt32) Engine = Memory' > /dev/null 2>&1
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=1&http_headers_progress_interval_ms=0&send_progress_in_http_headers=1" -d 'INSERT INTO insert_number_query (record) SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep -E 'Content-Encoding|X-ClickHouse-Summary|^[0-9]'
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}" -H 'Accept-Encoding: gzip' -d 'DROP TABLE insert_number_query' > /dev/null 2>&1
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}" -H 'Accept-Encoding: gzip' -d 'DROP TABLE insert_number_query2' > /dev/null 2>&1