mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Moved code to .cpp [#CLICKHOUSE-2027].
This commit is contained in:
parent
3552de49ea
commit
3b0900e6cf
@ -1,6 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
namespace Net
|
||||
{
|
||||
class HTTPServerResponse;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -3,14 +3,9 @@
|
||||
#include <experimental/optional>
|
||||
#include <mutex>
|
||||
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
|
||||
#include <DB/Common/Exception.h>
|
||||
|
||||
#include <DB/IO/WriteBuffer.h>
|
||||
#include <DB/IO/BufferWithOwnMemory.h>
|
||||
#include <DB/IO/WriteBufferFromOStream.h>
|
||||
#include <DB/IO/WriteBufferFromString.h>
|
||||
#include <DB/IO/ZlibDeflatingWriteBuffer.h>
|
||||
#include <DB/IO/HTTPCommon.h>
|
||||
#include <DB/Common/NetException.h>
|
||||
@ -18,15 +13,18 @@
|
||||
#include <DB/Core/Progress.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
namespace Poco
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
namespace Net
|
||||
{
|
||||
class HTTPServerResponse;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream
|
||||
/// (using response.send()) only after data is flushed for the first time. This is needed in HTTP
|
||||
/// servers to change some HTTP headers (e.g. response code) before any data is sent to the client
|
||||
@ -71,126 +69,28 @@ private:
|
||||
/// Must be called under locked mutex.
|
||||
/// This method send headers, if this was not done already,
|
||||
/// but not finish them with \r\n, allowing to send more headers subsequently.
|
||||
void startSendHeaders()
|
||||
{
|
||||
if (!headers_started_sending)
|
||||
{
|
||||
headers_started_sending = true;
|
||||
|
||||
if (add_cors_header)
|
||||
response.set("Access-Control-Allow-Origin", "*");
|
||||
|
||||
setResponseDefaultHeaders(response);
|
||||
|
||||
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
|
||||
}
|
||||
}
|
||||
void startSendHeaders();
|
||||
|
||||
/// This method finish headers with \r\n, allowing to start to send body.
|
||||
void finishSendHeaders()
|
||||
{
|
||||
if (!headers_finished_sending)
|
||||
{
|
||||
headers_finished_sending = true;
|
||||
void finishSendHeaders();
|
||||
|
||||
/// Send end of headers delimiter.
|
||||
*response_header_ostr << "\r\n" << std::flush;
|
||||
}
|
||||
}
|
||||
|
||||
void nextImpl() override
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
startSendHeaders();
|
||||
|
||||
if (!out)
|
||||
{
|
||||
if (compress)
|
||||
{
|
||||
if (compression_method == ZlibCompressionMethod::Gzip)
|
||||
*response_header_ostr << "Content-Encoding: gzip\r\n";
|
||||
else if (compression_method == ZlibCompressionMethod::Zlib)
|
||||
*response_header_ostr << "Content-Encoding: deflate\r\n";
|
||||
else
|
||||
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
|
||||
out_raw.emplace(*response_body_ostr);
|
||||
deflating_buf.emplace(out_raw.value(), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
|
||||
out = &deflating_buf.value();
|
||||
}
|
||||
else
|
||||
{
|
||||
out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
|
||||
out = &out_raw.value();
|
||||
}
|
||||
}
|
||||
|
||||
finishSendHeaders();
|
||||
}
|
||||
|
||||
out->position() = position();
|
||||
out->next();
|
||||
}
|
||||
void nextImpl() override;
|
||||
|
||||
public:
|
||||
WriteBufferFromHTTPServerResponse(
|
||||
Poco::Net::HTTPServerResponse & response_,
|
||||
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
|
||||
ZlibCompressionMethod compression_method_ = ZlibCompressionMethod::Gzip,
|
||||
size_t size = DBMS_DEFAULT_BUFFER_SIZE)
|
||||
: BufferWithOwnMemory<WriteBuffer>(size), response(response_),
|
||||
compress(compress_), compression_method(compression_method_) {}
|
||||
size_t size = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
|
||||
/// Writes progess in repeating HTTP headers.
|
||||
void onProgress(const Progress & progress)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
/// Cannot add new headers if body was started to send.
|
||||
if (headers_finished_sending)
|
||||
return;
|
||||
|
||||
accumulated_progress.incrementPiecewiseAtomically(progress);
|
||||
|
||||
if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
|
||||
{
|
||||
progress_watch.restart();
|
||||
|
||||
/// Send all common headers before our special progress headers.
|
||||
startSendHeaders();
|
||||
|
||||
std::string progress_string;
|
||||
{
|
||||
WriteBufferFromString progress_string_writer(progress_string);
|
||||
accumulated_progress.writeJSON(progress_string_writer);
|
||||
}
|
||||
|
||||
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string << "\r\n" << std::flush;
|
||||
}
|
||||
}
|
||||
void onProgress(const Progress & progress);
|
||||
|
||||
/// Send at least HTTP headers if no data has been sent yet.
|
||||
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
|
||||
/// to change response HTTP code.
|
||||
/// This method is idempotent.
|
||||
void finalize()
|
||||
{
|
||||
if (offset())
|
||||
{
|
||||
next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// If no remaining data, just send headers.
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
startSendHeaders();
|
||||
finishSendHeaders();
|
||||
}
|
||||
}
|
||||
void finalize();
|
||||
|
||||
/// Turn compression on or off.
|
||||
/// The setting has any effect only if HTTP headers haven't been sent yet.
|
||||
@ -219,17 +119,7 @@ public:
|
||||
send_progress_interval_ms = send_progress_interval_ms_;
|
||||
}
|
||||
|
||||
~WriteBufferFromHTTPServerResponse()
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
~WriteBufferFromHTTPServerResponse();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include <DB/IO/HTTPCommon.h>
|
||||
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
156
dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp
Normal file
156
dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp
Normal file
@ -0,0 +1,156 @@
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
|
||||
#include <DB/Common/Exception.h>
|
||||
|
||||
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
|
||||
#include <DB/IO/WriteBufferFromString.h>
|
||||
#include <DB/IO/HTTPCommon.h>
|
||||
#include <DB/Common/NetException.h>
|
||||
#include <DB/Common/Stopwatch.h>
|
||||
#include <DB/Core/Progress.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::startSendHeaders()
|
||||
{
|
||||
if (!headers_started_sending)
|
||||
{
|
||||
headers_started_sending = true;
|
||||
|
||||
if (add_cors_header)
|
||||
response.set("Access-Control-Allow-Origin", "*");
|
||||
|
||||
setResponseDefaultHeaders(response);
|
||||
|
||||
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::finishSendHeaders()
|
||||
{
|
||||
if (!headers_finished_sending)
|
||||
{
|
||||
headers_finished_sending = true;
|
||||
|
||||
/// Send end of headers delimiter.
|
||||
*response_header_ostr << "\r\n" << std::flush;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::nextImpl()
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
startSendHeaders();
|
||||
|
||||
if (!out)
|
||||
{
|
||||
if (compress)
|
||||
{
|
||||
if (compression_method == ZlibCompressionMethod::Gzip)
|
||||
*response_header_ostr << "Content-Encoding: gzip\r\n";
|
||||
else if (compression_method == ZlibCompressionMethod::Zlib)
|
||||
*response_header_ostr << "Content-Encoding: deflate\r\n";
|
||||
else
|
||||
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
|
||||
out_raw.emplace(*response_body_ostr);
|
||||
deflating_buf.emplace(out_raw.value(), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
|
||||
out = &deflating_buf.value();
|
||||
}
|
||||
else
|
||||
{
|
||||
out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
|
||||
out = &out_raw.value();
|
||||
}
|
||||
}
|
||||
|
||||
finishSendHeaders();
|
||||
}
|
||||
|
||||
out->position() = position();
|
||||
out->next();
|
||||
}
|
||||
|
||||
|
||||
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
|
||||
Poco::Net::HTTPServerResponse & response_,
|
||||
bool compress_,
|
||||
ZlibCompressionMethod compression_method_,
|
||||
size_t size)
|
||||
: BufferWithOwnMemory<WriteBuffer>(size), response(response_),
|
||||
compress(compress_), compression_method(compression_method_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
/// Cannot add new headers if body was started to send.
|
||||
if (headers_finished_sending)
|
||||
return;
|
||||
|
||||
accumulated_progress.incrementPiecewiseAtomically(progress);
|
||||
|
||||
if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
|
||||
{
|
||||
progress_watch.restart();
|
||||
|
||||
/// Send all common headers before our special progress headers.
|
||||
startSendHeaders();
|
||||
|
||||
std::string progress_string;
|
||||
{
|
||||
WriteBufferFromString progress_string_writer(progress_string);
|
||||
accumulated_progress.writeJSON(progress_string_writer);
|
||||
}
|
||||
|
||||
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string << "\r\n" << std::flush;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void WriteBufferFromHTTPServerResponse::finalize()
|
||||
{
|
||||
if (offset())
|
||||
{
|
||||
next();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// If no remaining data, just send headers.
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
startSendHeaders();
|
||||
finishSendHeaders();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user