ClickHouse/src/IO/WriteBufferFromHTTPServerResponse.cpp

220 lines
5.5 KiB
C++
Raw Normal View History

#include <Poco/Version.h>
#include <Poco/Net/HTTPServerResponse.h>
2019-02-10 17:40:52 +00:00
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromString.h>
#include <IO/HTTPCommon.h>
2019-02-10 17:40:52 +00:00
#include <IO/Progress.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
2019-02-10 17:40:52 +00:00
2017-01-30 05:13:58 +00:00
namespace DB
{
namespace ErrorCodes
{
}
void WriteBufferFromHTTPServerResponse::startSendHeaders()
{
if (!headers_started_sending)
{
headers_started_sending = true;
2017-01-30 05:13:58 +00:00
if (add_cors_header)
response.set("Access-Control-Allow-Origin", "*");
2017-01-30 05:13:58 +00:00
setResponseDefaultHeaders(response, keep_alive_timeout);
2017-01-30 05:13:58 +00:00
2019-02-15 11:46:07 +00:00
#if defined(POCO_CLICKHOUSE_PATCH)
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
#endif
}
2017-01-30 05:13:58 +00:00
}
void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
{
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending)
return;
2019-05-06 06:57:48 +00:00
WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer);
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
}
void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
{
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending)
return;
2019-05-06 06:57:48 +00:00
WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer);
2019-05-13 05:55:53 +00:00
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
}
2017-01-30 05:13:58 +00:00
void WriteBufferFromHTTPServerResponse::finishSendHeaders()
{
if (!headers_finished_sending)
{
writeHeaderSummary();
headers_finished_sending = true;
2017-01-30 05:13:58 +00:00
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
{
2019-02-15 11:46:07 +00:00
#if defined(POCO_CLICKHOUSE_PATCH)
/// Send end of headers delimiter.
if (response_header_ostr)
*response_header_ostr << "\r\n" << std::flush;
#else
/// Newline autosent by response.send()
/// if nothing to send in body:
if (!response_body_ostr)
response_body_ostr = &(response.send());
#endif
}
else
{
if (!response_body_ostr)
response_body_ostr = &(response.send());
}
}
2017-01-30 05:13:58 +00:00
}
void WriteBufferFromHTTPServerResponse::nextImpl()
{
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
2017-01-30 05:13:58 +00:00
startSendHeaders();
2017-01-30 05:13:58 +00:00
if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
{
if (compress)
{
auto content_encoding_name = toContentEncodingName(compression_method);
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n";
#else
response.set("Content-Encoding", content_encoding_name);
#endif
}
2020-01-04 22:59:08 +00:00
2019-02-15 11:46:07 +00:00
#if !defined(POCO_CLICKHOUSE_PATCH)
2020-01-04 22:59:08 +00:00
response_body_ostr = &(response.send());
#endif
2020-01-04 22:59:08 +00:00
2020-01-05 01:49:12 +00:00
/// We reuse our buffer in "out" to avoid extra allocations and copies.
if (compress)
out = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromOStream>(*response_body_ostr),
compress ? compression_method : CompressionMethod::None,
compression_level,
working_buffer.size(),
working_buffer.begin());
else
out = std::make_unique<WriteBufferFromOStream>(
*response_body_ostr,
working_buffer.size(),
working_buffer.begin());
}
2017-01-30 05:13:58 +00:00
finishSendHeaders();
}
2017-01-30 05:13:58 +00:00
if (out)
{
2020-01-05 01:49:12 +00:00
out->buffer() = buffer();
out->position() = position();
out->next();
}
2017-01-30 05:13:58 +00:00
}
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerRequest & request_,
Poco::Net::HTTPServerResponse & response_,
unsigned keep_alive_timeout_,
bool compress_,
CompressionMethod compression_method_)
2020-01-04 22:59:08 +00:00
: BufferWithOwnMemory<WriteBuffer>(DBMS_DEFAULT_BUFFER_SIZE)
, request(request_)
, response(response_)
, keep_alive_timeout(keep_alive_timeout_)
, compress(compress_)
, compression_method(compression_method_)
2017-01-30 05:13:58 +00:00
{
}
void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
2017-01-30 05:13:58 +00:00
/// Cannot add new headers if body was started to send.
if (headers_finished_sending)
return;
2017-01-30 05:13:58 +00:00
accumulated_progress.incrementPiecewiseAtomically(progress);
2017-01-30 05:13:58 +00:00
if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
{
progress_watch.restart();
2017-01-30 05:13:58 +00:00
/// Send all common headers before our special progress headers.
startSendHeaders();
writeHeaderProgress();
}
2017-01-30 05:13:58 +00:00
}
void WriteBufferFromHTTPServerResponse::finalize()
{
if (offset())
{
next();
2020-01-07 08:32:58 +00:00
if (out)
out.reset();
}
else
{
/// If no remaining data, just send headers.
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
startSendHeaders();
finishSendHeaders();
}
2017-01-30 05:13:58 +00:00
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
2017-01-30 05:13:58 +00:00
}
}