Return to old implementation of read and write values for progress

This commit is contained in:
Guillaume Tassery 2019-05-13 11:48:09 +07:00
parent a4c891e529
commit 1b30f91c42
4 changed files with 62 additions and 177 deletions

View File

@ -8,7 +8,7 @@
namespace DB namespace DB
{ {
void AllProgressValueImpl::read(ProgressValues & value, ReadBuffer & in, UInt64 /*server_revision*/) void ProgressValues::read(ReadBuffer & in, UInt64 server_revision)
{ {
size_t new_rows = 0; size_t new_rows = 0;
size_t new_bytes = 0; size_t new_bytes = 0;
@ -19,92 +19,70 @@ void AllProgressValueImpl::read(ProgressValues & value, ReadBuffer & in, UInt64
readVarUInt(new_rows, in); readVarUInt(new_rows, in);
readVarUInt(new_bytes, in); readVarUInt(new_bytes, in);
readVarUInt(new_total_rows, in); readVarUInt(new_total_rows, in);
readVarUInt(new_write_rows, in); if (DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO >= server_revision)
readVarUInt(new_write_bytes, in); {
readVarUInt(new_write_rows, in);
readVarUInt(new_write_bytes, in);
}
value.rows = new_rows; this->rows = new_rows;
value.bytes = new_bytes; this->bytes = new_bytes;
value.total_rows = new_total_rows; this->total_rows = new_total_rows;
value.write_rows = new_write_rows; this->write_rows = new_write_rows;
value.write_bytes = new_write_bytes; this->write_bytes = new_write_bytes;
} }
void AllProgressValueImpl::write(const ProgressValues & value, WriteBuffer & out, UInt64 /*client_revision*/) void ProgressValues::write(WriteBuffer & out, UInt64 client_revision) const
{ {
writeVarUInt(value.rows, out); writeVarUInt(this->rows, out);
writeVarUInt(value.bytes, out); writeVarUInt(this->bytes, out);
writeVarUInt(value.total_rows, out); writeVarUInt(this->total_rows, out);
writeVarUInt(value.write_rows, out); if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
writeVarUInt(value.write_bytes, out); {
writeVarUInt(this->write_rows, out);
writeVarUInt(this->write_bytes, out);
}
} }
void AllProgressValueImpl::writeJSON(const ProgressValues & value, WriteBuffer & out) void ProgressValues::writeJSON(WriteBuffer & out) const
{ {
/// Numbers are written in double quotes (as strings) to avoid loss of precision /// Numbers are written in double quotes (as strings) to avoid loss of precision
/// of 64-bit integers after interpretation by JavaScript. /// of 64-bit integers after interpretation by JavaScript.
writeCString("{\"read_rows\":\"", out); writeCString("{\"read_rows\":\"", out);
writeText(value.rows, out); writeText(this->rows, out);
writeCString("\",\"read_bytes\":\"", out); writeCString("\",\"read_bytes\":\"", out);
writeText(value.bytes, out); writeText(this->bytes, out);
writeCString("\",\"write_rows\":\"", out); writeCString("\",\"written_rows\":\"", out);
writeText(value.write_rows, out); writeText(this->write_rows, out);
writeCString("\",\"write_bytes\":\"", out); writeCString("\",\"written_bytes\":\"", out);
writeText(value.write_bytes, out); writeText(this->write_bytes, out);
writeCString("\",\"total_rows\":\"", out); writeCString("\",\"total_rows\":\"", out);
writeText(value.total_rows, out); writeText(this->total_rows, out);
writeCString("\"}", out); writeCString("\"}", out);
} }
void ReadProgressValueImpl::read(ProgressValues & value, ReadBuffer & in, UInt64 /*server_revision*/) void Progress::read(ReadBuffer & in, UInt64 server_revision)
{ {
size_t new_rows = 0; ProgressValues values;
size_t new_bytes = 0; values.read(in, server_revision);
size_t new_total_rows = 0;
readVarUInt(new_rows, in); rows.store(values.rows, std::memory_order_relaxed);
readVarUInt(new_bytes, in); bytes.store(values.bytes, std::memory_order_relaxed);
readVarUInt(new_total_rows, in); total_rows.store(values.total_rows, std::memory_order_relaxed);
write_rows.store(values.write_rows, std::memory_order_relaxed);
value.rows = new_rows; write_bytes.store(values.write_bytes, std::memory_order_relaxed);
value.bytes = new_bytes;
value.total_rows = new_total_rows;
} }
void Progress::write(WriteBuffer & out, UInt64 client_revision) const
void ReadProgressValueImpl::write(const ProgressValues & value, WriteBuffer & out, UInt64 /*client_revision*/)
{ {
writeVarUInt(value.rows, out); getValues().write(out, client_revision);
writeVarUInt(value.bytes, out);
writeVarUInt(value.total_rows, out);
} }
void Progress::writeJSON(WriteBuffer & out) const
void ReadProgressValueImpl::writeJSON(const ProgressValues & value, WriteBuffer & out)
{ {
/// Numbers are written in double quotes (as strings) to avoid loss of precision getValues().writeJSON(out);
/// of 64-bit integers after interpretation by JavaScript.
writeCString("{\"read_rows\":\"", out);
writeText(value.rows, out);
writeCString("\",\"read_bytes\":\"", out);
writeText(value.bytes, out);
writeCString("\",\"total_rows\":\"", out);
writeText(value.total_rows, out);
writeCString("\"}", out);
}
void WriteProgressValueImpl::writeJSON(const ProgressValues & value, WriteBuffer & out)
{
/// Numbers are written in double quotes (as strings) to avoid loss of precision
/// of 64-bit integers after interpretation by JavaScript.
writeCString("{\"write_rows\":\"", out);
writeText(value.write_rows, out);
writeCString("\",\"write_bytes\":\"", out);
writeText(value.write_bytes, out);
writeCString("\"}", out);
} }
} }

View File

@ -12,8 +12,6 @@ namespace DB
class ReadBuffer; class ReadBuffer;
class WriteBuffer; class WriteBuffer;
struct AllProgressValueImpl;
/// See Progress. /// See Progress.
struct ProgressValues struct ProgressValues
{ {
@ -23,34 +21,9 @@ struct ProgressValues
size_t write_rows; size_t write_rows;
size_t write_bytes; size_t write_bytes;
void read(ReadBuffer & in, UInt64 server_revision);
template <typename ReadImpl = AllProgressValueImpl> void write(WriteBuffer & out, UInt64 client_revision) const;
inline void read(ReadBuffer & in, UInt64 server_revision); void writeJSON(WriteBuffer & out) const;
template <typename WriteImpl = AllProgressValueImpl>
inline void write(WriteBuffer & out, UInt64 client_revision) const;
template <typename WriteJSONImpl = AllProgressValueImpl>
inline void writeJSON(WriteBuffer & out) const;
};
struct AllProgressValueImpl
{
static void read(ProgressValues & value, ReadBuffer & in, UInt64 server_revision);
static void write(const ProgressValues & value, WriteBuffer & out, UInt64 client_revision);
static void writeJSON(const ProgressValues & value, WriteBuffer & out);
};
struct ReadProgressValueImpl : public AllProgressValueImpl
{
static void read(ProgressValues & value, ReadBuffer & in, UInt64 server_revision);
static void write(const ProgressValues & value, WriteBuffer & out, UInt64 client_revision);
static void writeJSON(const ProgressValues & value, WriteBuffer & out);
};
struct WriteProgressValueImpl : public AllProgressValueImpl
{
static void writeJSON(const ProgressValues & value, WriteBuffer & out);
}; };
struct ReadProgress struct ReadProgress
@ -99,14 +72,9 @@ struct Progress
Progress(WriteProgress write_progress) Progress(WriteProgress write_progress)
: write_rows(write_progress.write_rows), write_bytes(write_progress.write_bytes) {} : write_rows(write_progress.write_rows), write_bytes(write_progress.write_bytes) {}
template <typename T = AllProgressValueImpl>
void read(ReadBuffer & in, UInt64 server_revision); void read(ReadBuffer & in, UInt64 server_revision);
template <typename T = AllProgressValueImpl>
void write(WriteBuffer & out, UInt64 client_revision) const; void write(WriteBuffer & out, UInt64 client_revision) const;
/// Progress in JSON format (single line, without whitespaces) is used in HTTP headers. /// Progress in JSON format (single line, without whitespaces) is used in HTTP headers.
template <typename T = AllProgressValueImpl>
void writeJSON(WriteBuffer & out) const; void writeJSON(WriteBuffer & out) const;
/// Each value separately is changed atomically (but not whole object). /// Each value separately is changed atomically (but not whole object).
@ -173,65 +141,4 @@ struct Progress
} }
}; };
template <typename T>
void Progress::read(ReadBuffer & in, UInt64 server_revision)
{
ProgressValues values;
values.read<T>(in, server_revision);
rows.store(values.rows, std::memory_order_relaxed);
bytes.store(values.bytes, std::memory_order_relaxed);
total_rows.store(values.total_rows, std::memory_order_relaxed);
write_rows.store(values.write_rows, std::memory_order_relaxed);
write_bytes.store(values.write_bytes, std::memory_order_relaxed);
}
template <typename T>
void Progress::write(WriteBuffer & out, UInt64 client_revision) const
{
getValues().write<T>(out, client_revision);
}
template <typename T>
void Progress::writeJSON(WriteBuffer & out) const
{
getValues().writeJSON<T>(out);
}
template <typename ReadImpl>
inline void ProgressValues::read(ReadBuffer & in, UInt64 server_revision)
{
if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
ReadImpl::read(*this, in, server_revision);
else
read<ReadProgressValueImpl>(in, server_revision);
}
template <>
inline void ProgressValues::read<ReadProgressValueImpl>(ReadBuffer & in, UInt64 server_revision)
{
ReadProgressValueImpl::read(*this, in, server_revision);
}
template <typename WriteImpl>
inline void ProgressValues::write(WriteBuffer & out, UInt64 client_revision) const
{
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
WriteImpl::write(*this, out, client_revision);
else
write<ReadProgressValueImpl>(out, client_revision);
}
template <>
inline void ProgressValues::write<ReadProgressValueImpl>(WriteBuffer & out, UInt64 client_revision) const
{
ReadProgressValueImpl::write(*this, out, client_revision);
}
template <typename WriteJSONImpl>
void ProgressValues::writeJSON(WriteBuffer & out) const
{
WriteJSONImpl::writeJSON(*this, out);
}
} }

View File

@ -38,13 +38,13 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
void WriteBufferFromHTTPServerResponse::writeHeaderSummary() void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
{ {
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending) if (headers_finished_sending)
return; return;
WriteBufferFromOwnString progress_string_writer; WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer); accumulated_progress.writeJSON(progress_string_writer);
#if defined(POCO_CLICKHOUSE_PATCH)
if (response_header_ostr) if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush; *response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif #endif
@ -52,13 +52,13 @@ void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
void WriteBufferFromHTTPServerResponse::writeHeaderProgress() void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
{ {
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending) if (headers_finished_sending)
return; return;
WriteBufferFromOwnString progress_string_writer; WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON<ReadProgressValueImpl>(progress_string_writer); accumulated_progress.writeJSON(progress_string_writer);
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush; *response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif #endif
} }

View File

@ -1,19 +1,19 @@
< X-ClickHouse-Progress: {"read_rows":"0","read_bytes":"0","total_rows":"10"} < X-ClickHouse-Progress: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows":"10"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","total_rows":"10"} < X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","written_rows":"0","written_bytes":"0","total_rows":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","total_rows":"10"} < X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","total_rows":"10"} < X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows":"10"}
9 9
< X-ClickHouse-Progress: {"read_rows":"1","read_bytes":"8","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"2","read_bytes":"16","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"2","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"3","read_bytes":"24","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"3","read_bytes":"24","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"4","read_bytes":"32","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"4","read_bytes":"32","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"6","read_bytes":"48","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"6","read_bytes":"48","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"7","read_bytes":"56","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"7","read_bytes":"56","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"8","read_bytes":"64","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"8","read_bytes":"64","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"9","read_bytes":"72","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"9","read_bytes":"72","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","total_rows":"0"} < X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows":"0"}
0 0
1 1
2 2
@ -34,4 +34,4 @@
7 7
8 8
9 9
< X-ClickHouse-Summary: {"read_rows":"10","read_bytes":"80","write_rows":"10","write_bytes":"40","total_rows":"0"} < X-ClickHouse-Summary: {"read_rows":"10","read_bytes":"80","written_rows":"10","written_bytes":"40","total_rows":"0"}