Merge pull request #56064 from ClickHouse/feature-server-iface-metrics

Use CH Buffer for HTTP out stream, add metrics for interfaces
This commit is contained in:
Yakov Olkhovskiy 2024-01-02 10:17:52 -05:00 committed by GitHub
commit 5633fb8145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 966 additions and 479 deletions

View File

@ -2,6 +2,7 @@
#include "CatBoostLibraryHandler.h"
#include "CatBoostLibraryHandlerFactory.h"
#include "Common/ProfileEvents.h"
#include "ExternalDictionaryLibraryHandler.h"
#include "ExternalDictionaryLibraryHandlerFactory.h"
@ -44,7 +45,7 @@ namespace
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), fmt::runtime(message));
}
@ -96,7 +97,7 @@ ExternalDictionaryLibraryBridgeRequestHandler::ExternalDictionaryLibraryBridgeRe
}
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -384,7 +385,7 @@ ExternalDictionaryLibraryBridgeExistsHandler::ExternalDictionaryLibraryBridgeExi
}
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ExternalDictionaryLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
@ -423,7 +424,7 @@ CatBoostLibraryBridgeRequestHandler::CatBoostLibraryBridgeRequestHandler(
}
void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -621,7 +622,7 @@ CatBoostLibraryBridgeExistsHandler::CatBoostLibraryBridgeExistsHandler(size_t ke
}
void CatBoostLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void CatBoostLibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -20,7 +20,7 @@ class ExternalDictionaryLibraryBridgeRequestHandler : public HTTPRequestHandler,
public:
ExternalDictionaryLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
static constexpr inline auto FORMAT = "RowBinary";
@ -36,7 +36,7 @@ class ExternalDictionaryLibraryBridgeExistsHandler : public HTTPRequestHandler,
public:
ExternalDictionaryLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;
@ -65,7 +65,7 @@ class CatBoostLibraryBridgeRequestHandler : public HTTPRequestHandler, WithConte
public:
CatBoostLibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;
@ -79,7 +79,7 @@ class CatBoostLibraryBridgeExistsHandler : public HTTPRequestHandler, WithContex
public:
CatBoostLibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
const size_t keep_alive_timeout;

View File

@ -69,7 +69,7 @@ namespace
}
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -78,7 +78,7 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -23,7 +23,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -21,7 +21,7 @@
namespace DB
{
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -30,7 +30,7 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
response.send()->writeln(message);
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -21,7 +21,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -46,12 +46,12 @@ void ODBCHandler::processError(HTTPServerResponse & response, const std::string
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
}
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request);
LOG_TRACE(log, "Request URI: {}", request.getURI());

View File

@ -30,7 +30,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -6,7 +6,7 @@
namespace DB
{
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -10,7 +10,7 @@ class PingHandler : public HTTPRequestHandler
{
public:
explicit PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
size_t keep_alive_timeout;

View File

@ -29,7 +29,7 @@ namespace
}
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
HTMLForm params(getContext()->getSettingsRef(), request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
@ -38,7 +38,7 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
*response.send() << message << '\n';
LOG_WARNING(log, fmt::runtime(message));
};

View File

@ -24,7 +24,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
Poco::Logger * log;

View File

@ -152,6 +152,18 @@ namespace ProfileEvents
{
extern const Event MainConfigLoads;
extern const Event ServerStartupMilliseconds;
extern const Event InterfaceNativeSendBytes;
extern const Event InterfaceNativeReceiveBytes;
extern const Event InterfaceHTTPSendBytes;
extern const Event InterfaceHTTPReceiveBytes;
extern const Event InterfacePrometheusSendBytes;
extern const Event InterfacePrometheusReceiveBytes;
extern const Event InterfaceInterserverSendBytes;
extern const Event InterfaceInterserverReceiveBytes;
extern const Event InterfaceMySQLSendBytes;
extern const Event InterfaceMySQLReceiveBytes;
extern const Event InterfacePostgreSQLSendBytes;
extern const Event InterfacePostgreSQLReceiveBytes;
}
namespace fs = std::filesystem;
@ -2047,7 +2059,7 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr
{
if (type == "tcp")
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false));
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes));
if (type == "tls")
#if USE_SSL
@ -2059,20 +2071,20 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
if (type == "proxy1")
return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(*this, conf_name));
if (type == "mysql")
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this));
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes));
if (type == "postgres")
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this));
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes));
if (type == "http")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes)
);
if (type == "prometheus")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes)
);
if (type == "interserver")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"))
new HTTPServerConnectionFactory(httpContext(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"), ProfileEvents::InterfaceInterserverReceiveBytes, ProfileEvents::InterfaceInterserverSendBytes)
);
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type);
@ -2205,7 +2217,7 @@ void Server::createServers(
port_name,
"http://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes));
});
}
@ -2225,7 +2237,7 @@ void Server::createServers(
port_name,
"https://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfaceHTTPReceiveBytes, ProfileEvents::InterfaceHTTPSendBytes));
#else
UNUSED(port);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "HTTPS protocol is disabled because Poco library was built without NetSSL support.");
@ -2248,7 +2260,7 @@ void Server::createServers(
port_name,
"native protocol (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false),
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2270,7 +2282,7 @@ void Server::createServers(
port_name,
"native protocol (tcp) with PROXY: " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true),
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2293,7 +2305,7 @@ void Server::createServers(
port_name,
"secure native protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false),
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
@ -2317,7 +2329,7 @@ void Server::createServers(
listen_host,
port_name,
"MySQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(new MySQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
std::make_unique<TCPServer>(new MySQLHandlerFactory(*this, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
@ -2334,7 +2346,7 @@ void Server::createServers(
listen_host,
port_name,
"PostgreSQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
std::make_unique<TCPServer>(new PostgreSQLHandlerFactory(*this, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
@ -2368,7 +2380,7 @@ void Server::createServers(
port_name,
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params, ProfileEvents::InterfacePrometheusReceiveBytes, ProfileEvents::InterfacePrometheusSendBytes));
});
}
}
@ -2414,7 +2426,9 @@ void Server::createInterserverServers(
createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
});
}
@ -2437,7 +2451,9 @@ void Server::createInterserverServers(
createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPSHandler-factory"),
server_pool,
socket,
http_params));
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
#else
UNUSED(port);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");

View File

@ -587,6 +587,19 @@ The server successfully detected this situation and will download merged part fr
M(LogError, "Number of log messages with level Error") \
M(LogFatal, "Number of log messages with level Fatal") \
\
M(InterfaceHTTPSendBytes, "Number of bytes sent through HTTP interfaces") \
M(InterfaceHTTPReceiveBytes, "Number of bytes received through HTTP interfaces") \
M(InterfaceNativeSendBytes, "Number of bytes sent through native interfaces") \
M(InterfaceNativeReceiveBytes, "Number of bytes received through native interfaces") \
M(InterfacePrometheusSendBytes, "Number of bytes sent through Prometheus interfaces") \
M(InterfacePrometheusReceiveBytes, "Number of bytes received through Prometheus interfaces") \
M(InterfaceInterserverSendBytes, "Number of bytes sent through interserver interfaces") \
M(InterfaceInterserverReceiveBytes, "Number of bytes received through interserver interfaces") \
M(InterfaceMySQLSendBytes, "Number of bytes sent through MySQL interfaces") \
M(InterfaceMySQLReceiveBytes, "Number of bytes received through MySQL interfaces") \
M(InterfacePostgreSQLSendBytes, "Number of bytes sent through PostgreSQL interfaces") \
M(InterfacePostgreSQLReceiveBytes, "Number of bytes received through PostgreSQL interfaces") \
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS

View File

@ -13,33 +13,14 @@ namespace ErrorCodes
}
class BrotliWriteBuffer::BrotliStateWrapper
BrotliWriteBuffer::BrotliStateWrapper::BrotliStateWrapper()
: state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr))
{
public:
BrotliStateWrapper()
: state(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr))
{
}
}
~BrotliStateWrapper()
{
BrotliEncoderDestroyInstance(state);
}
BrotliEncoderState * state;
};
BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
BrotliWriteBuffer::BrotliStateWrapper::~BrotliStateWrapper()
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24);
BrotliEncoderDestroyInstance(state);
}
BrotliWriteBuffer::~BrotliWriteBuffer() = default;
@ -58,18 +39,20 @@ void BrotliWriteBuffer::nextImpl()
{
do
{
const auto * in_data_ptr = in_data;
out->nextIfAtEnd();
out_data = reinterpret_cast<unsigned char *>(out->position());
out_capacity = out->buffer().end() - out->position();
int result = BrotliEncoderCompressStream(
brotli->state,
in_available ? BROTLI_OPERATION_PROCESS : BROTLI_OPERATION_FINISH,
BROTLI_OPERATION_PROCESS,
&in_available,
&in_data,
&out_capacity,
&out_data,
nullptr);
total_in += in_data - in_data_ptr;
out->position() = out->buffer().end() - out_capacity;
@ -92,6 +75,10 @@ void BrotliWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && total_in == 0)
return;
while (true)
{
out->nextIfAtEnd();

View File

@ -4,18 +4,38 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
#include "config.h"
#if USE_BROTLI
# include <brotli/encode.h>
namespace DB
{
class BrotliWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
BrotliWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, brotli(std::make_unique<BrotliStateWrapper>())
, in_available(0)
, in_data(nullptr)
, out_capacity(0)
, out_data(nullptr)
, compress_empty(compress_empty_)
{
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_QUALITY, static_cast<uint32_t>(compression_level));
// Set LZ77 window size. According to brotli sources default value is 24 (c/tools/brotli.c:81)
BrotliEncoderSetParameter(brotli->state, BROTLI_PARAM_LGWIN, 24);
}
~BrotliWriteBuffer() override;
@ -24,7 +44,15 @@ private:
void finalizeBefore() override;
class BrotliStateWrapper;
class BrotliStateWrapper
{
public:
BrotliStateWrapper();
~BrotliStateWrapper();
BrotliEncoderState * state;
};
std::unique_ptr<BrotliStateWrapper> brotli;
@ -33,6 +61,12 @@ private:
size_t out_capacity;
uint8_t * out_data;
protected:
UInt64 total_in = 0;
bool compress_empty = true;
};
}
#endif

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <algorithm>
#include <memory>
namespace DB

View File

@ -15,34 +15,22 @@ namespace ErrorCodes
}
class Bzip2WriteBuffer::Bzip2StateWrapper
Bzip2WriteBuffer::Bzip2StateWrapper::Bzip2StateWrapper(int compression_level)
{
public:
explicit Bzip2StateWrapper(int compression_level)
{
memset(&stream, 0, sizeof(stream));
memset(&stream, 0, sizeof(stream));
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
int ret = BZ2_bzCompressInit(&stream, compression_level, 0, 0);
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
if (ret != BZ_OK)
throw Exception(
ErrorCodes::BZIP2_STREAM_ENCODER_FAILED,
"bzip2 stream encoder init failed: error code: {}",
ret);
}
~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
bz_stream stream;
};
Bzip2WriteBuffer::Bzip2WriteBuffer(std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, bz(std::make_unique<Bzip2StateWrapper>(compression_level))
Bzip2WriteBuffer::Bzip2StateWrapper::~Bzip2StateWrapper()
{
BZ2_bzCompressEnd(&stream);
}
Bzip2WriteBuffer::~Bzip2WriteBuffer() = default;
@ -77,6 +65,8 @@ void Bzip2WriteBuffer::nextImpl()
}
while (bz->stream.avail_in > 0);
total_in += offset();
}
catch (...)
{
@ -90,6 +80,10 @@ void Bzip2WriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && total_in == 0)
return;
out->nextIfAtEnd();
bz->stream.next_out = out->position();
bz->stream.avail_out = static_cast<unsigned>(out->buffer().end() - out->position());

View File

@ -4,18 +4,29 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferDecorator.h>
#include "config.h"
#if USE_BZIP2
# include <bzlib.h>
namespace DB
{
class Bzip2WriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
Bzip2WriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), bz(std::make_unique<Bzip2StateWrapper>(compression_level))
, compress_empty(compress_empty_)
{
}
~Bzip2WriteBuffer() override;
@ -24,8 +35,20 @@ private:
void finalizeBefore() override;
class Bzip2StateWrapper;
class Bzip2StateWrapper
{
public:
explicit Bzip2StateWrapper(int compression_level);
~Bzip2StateWrapper();
bz_stream stream;
};
std::unique_ptr<Bzip2StateWrapper> bz;
bool compress_empty = true;
UInt64 total_in = 0;
};
}
#endif

View File

@ -169,37 +169,66 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
return createCompressedWrapper(std::move(nested), method, buf_size, existing_memory, alignment, zstd_window_log_max);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment)
template<typename WriteBufferT>
std::unique_ptr<WriteBuffer> createWriteCompressedWrapper(
WriteBufferT && nested, CompressionMethod method, int level, size_t buf_size, char * existing_memory, size_t alignment, bool compress_empty)
{
if (method == DB::CompressionMethod::Gzip || method == CompressionMethod::Zlib)
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(nested), method, level, buf_size, existing_memory, alignment);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), method, level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BROTLI
if (method == DB::CompressionMethod::Brotli)
return std::make_unique<BrotliWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<BrotliWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif
if (method == CompressionMethod::Xz)
return std::make_unique<LZMADeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<LZMADeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Zstd)
return std::make_unique<ZstdDeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<ZstdDeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
if (method == CompressionMethod::Lz4)
return std::make_unique<Lz4DeflatingWriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<Lz4DeflatingWriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#if USE_BZIP2
if (method == CompressionMethod::Bzip2)
return std::make_unique<Bzip2WriteBuffer>(std::move(nested), level, buf_size, existing_memory, alignment);
return std::make_unique<Bzip2WriteBuffer>(std::forward<WriteBufferT>(nested), level, buf_size, existing_memory, alignment, compress_empty);
#endif
#if USE_SNAPPY
if (method == CompressionMethod::Snappy)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
#endif
if (method == CompressionMethod::None)
return nested;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported compression method");
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
size_t buf_size,
char * existing_memory,
size_t alignment,
bool compress_empty)
{
if (method == CompressionMethod::None)
return nested;
return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment, compress_empty);
}
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested,
CompressionMethod method,
int level,
size_t buf_size,
char * existing_memory,
size_t alignment,
bool compress_empty)
{
assert(method != CompressionMethod::None);
return createWriteCompressedWrapper(nested, method, level, buf_size, existing_memory, alignment, compress_empty);
}
}

View File

@ -61,13 +61,22 @@ std::unique_ptr<ReadBuffer> wrapReadBufferWithCompressionMethod(
char * existing_memory = nullptr,
size_t alignment = 0);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
std::unique_ptr<WriteBuffer> nested,
CompressionMethod method,
int level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty = true);
std::unique_ptr<WriteBuffer> wrapWriteBufferWithCompressionMethod(
WriteBuffer * nested,
CompressionMethod method,
int level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0,
bool compress_empty = true);
}

View File

@ -7,9 +7,7 @@ namespace ErrorCodes
extern const int LZMA_STREAM_ENCODER_FAILED;
}
LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
void LZMADeflatingWriteBuffer::initialize(int compression_level)
{
lstr = LZMA_STREAM_INIT;
@ -94,6 +92,10 @@ void LZMADeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && lstr.total_out == 0)
return;
do
{
out->nextIfAtEnd();

View File

@ -14,22 +14,32 @@ namespace DB
class LZMADeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
LZMADeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{
initialize(compression_level);
}
~LZMADeflatingWriteBuffer() override;
private:
void initialize(int compression_level);
void nextImpl() override;
void finalizeBefore() override;
void finalizeAfter() override;
lzma_stream lstr;
bool compress_empty = true;
};
}

View File

@ -63,11 +63,8 @@ namespace ErrorCodes
extern const int LZ4_ENCODER_FAILED;
}
Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, tmp_memory(buf_size)
void Lz4DeflatingWriteBuffer::initialize(int compression_level)
{
kPrefs = {
{LZ4F_max256KB,
@ -105,7 +102,7 @@ void Lz4DeflatingWriteBuffer::nextImpl()
if (first_time)
{
auto sink = SinkToOut(out.get(), tmp_memory, LZ4F_HEADER_SIZE_MAX);
auto sink = SinkToOut(out, tmp_memory, LZ4F_HEADER_SIZE_MAX);
chassert(sink.getCapacity() >= LZ4F_HEADER_SIZE_MAX);
/// write frame header and check for errors
@ -131,7 +128,7 @@ void Lz4DeflatingWriteBuffer::nextImpl()
/// Ensure that there is enough space for compressed block of minimal size
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
auto sink = SinkToOut(out.get(), tmp_memory, min_compressed_block_size);
auto sink = SinkToOut(out, tmp_memory, min_compressed_block_size);
chassert(sink.getCapacity() >= min_compressed_block_size);
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
@ -163,8 +160,12 @@ void Lz4DeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && first_time)
return;
auto suffix_size = LZ4F_compressBound(0, &kPrefs);
auto sink = SinkToOut(out.get(), tmp_memory, suffix_size);
auto sink = SinkToOut(out, tmp_memory, suffix_size);
chassert(sink.getCapacity() >= suffix_size);
/// compression end

View File

@ -14,16 +14,26 @@ namespace DB
class Lz4DeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
Lz4DeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, tmp_memory(buf_size)
, compress_empty(compress_empty_)
{
initialize(compression_level);
}
~Lz4DeflatingWriteBuffer() override;
private:
void initialize(int compression_level);
void nextImpl() override;
void finalizeBefore() override;
@ -35,5 +45,6 @@ private:
Memory<> tmp_memory;
bool first_time = true;
bool compress_empty = true;
};
}

View File

@ -99,6 +99,9 @@ bool ReadBufferFromPocoSocket::nextImpl()
if (bytes_read < 0)
throw NetException(ErrorCodes::CANNOT_READ_FROM_SOCKET, "Cannot read from socket ({})", peer_address.toString());
if (read_event != ProfileEvents::end())
ProfileEvents::increment(read_event, bytes_read);
if (bytes_read)
working_buffer.resize(bytes_read);
else
@ -111,10 +114,17 @@ ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_,
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, socket(socket_)
, peer_address(socket.peerAddress())
, read_event(ProfileEvents::end())
, socket_description("socket (" + peer_address.toString() + ")")
{
}
ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size)
: ReadBufferFromPocoSocket(socket_, buf_size)
{
read_event = read_event_;
}
bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) const
{
if (available())

View File

@ -20,10 +20,13 @@ protected:
*/
Poco::Net::SocketAddress peer_address;
ProfileEvents::Event read_event;
bool nextImpl() override;
public:
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & read_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
bool poll(size_t timeout_microseconds) const;

View File

@ -12,13 +12,21 @@ class WriteBuffer;
/// WriteBuffer that decorates data and delegates it to underlying buffer.
/// It's used for writing compressed and encrypted data
/// This class can own or not own underlying buffer - constructor will differentiate
/// std::unique_ptr<WriteBuffer> for owning and WriteBuffer* for not owning.
template <class Base>
class WriteBufferDecorator : public Base
{
public:
template <class ... BaseArgs>
explicit WriteBufferDecorator(std::unique_ptr<WriteBuffer> out_, BaseArgs && ... args)
: Base(std::forward<BaseArgs>(args)...), out(std::move(out_))
: Base(std::forward<BaseArgs>(args)...), owning_holder(std::move(out_)), out(owning_holder.get())
{
}
template <class ... BaseArgs>
explicit WriteBufferDecorator(WriteBuffer * out_, BaseArgs && ... args)
: Base(std::forward<BaseArgs>(args)...), out(out_)
{
}
@ -38,7 +46,7 @@ public:
}
}
WriteBuffer * getNestedBuffer() { return out.get(); }
WriteBuffer * getNestedBuffer() { return out; }
protected:
/// Do some finalization before finalization of underlying buffer.
@ -47,7 +55,8 @@ protected:
/// Do some finalization after finalization of underlying buffer.
virtual void finalizeAfter() {}
std::unique_ptr<WriteBuffer> out;
std::unique_ptr<WriteBuffer> owning_holder;
WriteBuffer * out;
};
using WriteBufferWithOwnMemoryDecorator = WriteBufferDecorator<BufferWithOwnMemory<WriteBuffer>>;

View File

@ -28,7 +28,7 @@ public:
void sync() override;
std::string getFileName() const override { return assert_cast<WriteBufferFromFileBase *>(out.get())->getFileName(); }
std::string getFileName() const override { return assert_cast<WriteBufferFromFileBase *>(out)->getFileName(); }
private:
void nextImpl() override;

View File

@ -34,6 +34,97 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
ssize_t WriteBufferFromPocoSocket::socketSendBytesImpl(const char * ptr, size_t size)
{
ssize_t res = 0;
/// If async_callback is specified, set socket to non-blocking mode
/// and try to write data to it, if socket is not ready for writing,
/// run async_callback and try again later.
/// It is expected that file descriptor may be polled externally.
/// Note that send timeout is not checked here. External code should check it while polling.
if (async_callback)
{
socket.setBlocking(false);
/// Set socket to blocking mode at the end.
SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
res = socket.impl()->sendBytes(ptr, static_cast<int>(size));
/// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too).
while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res)))))
{
/// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing.
if (secure && checkSSLWantRead(res))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
else
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
/// Try to write again.
res = socket.impl()->sendBytes(ptr, static_cast<int>(size));
}
}
else
{
res = socket.impl()->sendBytes(ptr, static_cast<int>(size));
}
return res;
}
void WriteBufferFromPocoSocket::socketSendBytes(const char * ptr, size_t size)
{
if (!size)
return;
Stopwatch watch;
size_t bytes_written = 0;
SCOPE_EXIT({
ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::NetworkSendBytes, bytes_written);
if (write_event != ProfileEvents::end())
ProfileEvents::increment(write_event, bytes_written);
});
while (bytes_written < size)
{
ssize_t res = 0;
/// Add more details to exceptions.
try
{
CurrentMetrics::Increment metric_increment(CurrentMetrics::NetworkSend);
if (size > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
res = socketSendBytesImpl(ptr + bytes_written, size - bytes_written);
}
catch (const Poco::Net::NetException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
catch (const Poco::TimeoutException &)
{
throw NetException(ErrorCodes::SOCKET_TIMEOUT, "Timeout exceeded while writing to socket ({}, {} ms)",
peer_address.toString(),
socket.impl()->getSendTimeout().totalMilliseconds());
}
catch (const Poco::IOException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
if (res < 0)
throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({} -> {})",
our_address.toString(), peer_address.toString());
bytes_written += res;
}
}
void WriteBufferFromPocoSocket::nextImpl()
{
if (!offset())
@ -60,36 +151,7 @@ void WriteBufferFromPocoSocket::nextImpl()
if (size > INT_MAX)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer overflow");
/// If async_callback is specified, set socket to non-blocking mode
/// and try to write data to it, if socket is not ready for writing,
/// run async_callback and try again later.
/// It is expected that file descriptor may be polled externally.
/// Note that send timeout is not checked here. External code should check it while polling.
if (async_callback)
{
socket.setBlocking(false);
/// Set socket to blocking mode at the end.
SCOPE_EXIT(socket.setBlocking(true));
bool secure = socket.secure();
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
/// Check EAGAIN and ERR_SSL_WANT_WRITE/ERR_SSL_WANT_READ for secure socket (writing to secure socket can read too).
while (res < 0 && (errno == EAGAIN || (secure && (checkSSLWantRead(res) || checkSSLWantWrite(res)))))
{
/// In case of ERR_SSL_WANT_READ we should wait for socket to be ready for reading, otherwise - for writing.
if (secure && checkSSLWantRead(res))
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), AsyncEventTimeoutType::RECEIVE, socket_description, AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
else
async_callback(socket.impl()->sockfd(), socket.getSendTimeout(), AsyncEventTimeoutType::SEND, socket_description, AsyncTaskExecutor::Event::WRITE | AsyncTaskExecutor::Event::ERROR);
/// Try to write again.
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
}
}
else
{
res = socket.impl()->sendBytes(pos, static_cast<int>(size));
}
res = socketSendBytesImpl(pos, size);
}
catch (const Poco::Net::NetException & e)
{
@ -125,6 +187,12 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
{
}
WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size)
: WriteBufferFromPocoSocket(socket_, buf_size)
{
write_event = write_event_;
}
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
try

View File

@ -17,14 +17,33 @@ class WriteBufferFromPocoSocket : public BufferWithOwnMemory<WriteBuffer>
{
public:
explicit WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromPocoSocket() override;
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
using WriteBuffer::write;
void write(const std::string & str) { WriteBuffer::write(str.c_str(), str.size()); }
void write(std::string_view str) { WriteBuffer::write(str.data(), str.size()); }
void write(const char * str) { WriteBuffer::write(str, strlen(str)); }
void writeln(const std::string & str) { write(str); WriteBuffer::write("\n", 1); }
void writeln(std::string_view str) { write(str); WriteBuffer::write("\n", 1); }
void writeln(const char * str) { write(str); WriteBuffer::write("\n", 1); }
protected:
void nextImpl() override;
void socketSendBytes(const char * ptr, size_t size);
void socketSendStr(const std::string & str)
{
return socketSendBytes(str.data(), str.size());
}
void socketSendStr(const char * ptr)
{
return socketSendBytes(ptr, strlen(ptr));
}
Poco::Net::Socket & socket;
/** For error messages. It is necessary to receive this address in advance, because,
@ -34,9 +53,13 @@ protected:
Poco::Net::SocketAddress peer_address;
Poco::Net::SocketAddress our_address;
ProfileEvents::Event write_event;
private:
AsyncCallback async_callback;
std::string socket_description;
ssize_t socketSendBytesImpl(const char * ptr, size_t size);
};
}

View File

@ -63,9 +63,7 @@ namespace ErrorCodes
inline void writeChar(char x, WriteBuffer & buf)
{
buf.nextIfAtEnd();
*buf.position() = x;
++buf.position();
buf.write(x);
}
/// Write the same character n times.

View File

@ -10,36 +10,6 @@ namespace ErrorCodes
extern const int ZLIB_DEFLATE_FAILED;
}
ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size,
char * existing_memory,
size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
zstr.opaque = nullptr;
zstr.next_in = nullptr;
zstr.avail_in = 0;
zstr.next_out = nullptr;
zstr.avail_out = 0;
int window_bits = 15;
if (compression_method == CompressionMethod::Gzip)
{
window_bits += 16;
}
int rc = deflateInit2(&zstr, compression_level, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_DEFLATE_FAILED, "deflateInit2 failed: {}; zlib version: {}", zError(rc), ZLIB_VERSION);
}
void ZlibDeflatingWriteBuffer::nextImpl()
{
if (!offset())
@ -82,6 +52,10 @@ void ZlibDeflatingWriteBuffer::finalizeBefore()
{
next();
/// Don't write out if no data was ever compressed
if (!compress_empty && zstr.total_out == 0)
return;
/// https://github.com/zlib-ng/zlib-ng/issues/494
do
{

View File

@ -12,17 +12,45 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ZLIB_DEFLATE_FAILED;
}
/// Performs compression using zlib library and writes compressed data to out_ WriteBuffer.
class ZlibDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
ZlibDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
zstr.opaque = nullptr;
zstr.next_in = nullptr;
zstr.avail_in = 0;
zstr.next_out = nullptr;
zstr.avail_out = 0;
int window_bits = 15;
if (compression_method == CompressionMethod::Gzip)
{
window_bits += 16;
}
int rc = deflateInit2(&zstr, compression_level, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY);
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_DEFLATE_FAILED, "deflateInit2 failed: {}; zlib version: {}", zError(rc), ZLIB_VERSION);
}
~ZlibDeflatingWriteBuffer() override;
@ -36,6 +64,7 @@ private:
virtual void finalizeAfter() override;
z_stream zstr;
bool compress_empty = true;
};
}

View File

@ -8,9 +8,7 @@ namespace ErrorCodes
extern const int ZSTD_ENCODER_FAILED;
}
ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
void ZstdDeflatingWriteBuffer::initialize(int compression_level)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
@ -44,6 +42,7 @@ void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
try
{
size_t out_offset = out->offset();
bool ended = false;
do
{
@ -67,6 +66,8 @@ void ZstdDeflatingWriteBuffer::flush(ZSTD_EndDirective mode)
ended = everything_was_compressed && everything_was_flushed;
} while (!ended);
total_out += out->offset() - out_offset;
}
catch (...)
{
@ -84,6 +85,9 @@ void ZstdDeflatingWriteBuffer::nextImpl()
void ZstdDeflatingWriteBuffer::finalizeBefore()
{
/// Don't write out if no data was ever compressed
if (!compress_empty && total_out == 0)
return;
flush(ZSTD_e_end);
}

View File

@ -14,12 +14,18 @@ namespace DB
class ZstdDeflatingWriteBuffer : public WriteBufferWithOwnMemoryDecorator
{
public:
template<typename WriteBufferT>
ZstdDeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
WriteBufferT && out_,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
size_t alignment = 0,
bool compress_empty_ = true)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment), compress_empty(compress_empty_)
{
initialize(compression_level);
}
~ZstdDeflatingWriteBuffer() override;
@ -29,6 +35,8 @@ public:
}
private:
void initialize(int compression_level);
void nextImpl() override;
/// Flush all pending data and write zstd footer to the underlying buffer.
@ -42,6 +50,9 @@ private:
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;
size_t total_out = 0;
bool compress_empty = true;
};
}

View File

@ -112,8 +112,7 @@ public:
throw Exception(ErrorCodes::SESSION_NOT_FOUND, "Session {} not found", session_id);
/// Create a new session from current context.
auto context = Context::createCopy(global_context);
it = sessions.insert(std::make_pair(key, std::make_shared<NamedSessionData>(key, context, timeout, *this))).first;
it = sessions.insert(std::make_pair(key, std::make_shared<NamedSessionData>(key, global_context, timeout, *this))).first;
const auto & session = it->second;
if (!thread.joinable())
@ -128,7 +127,7 @@ public:
/// Use existing session.
const auto & session = it->second;
LOG_TEST(log, "Reuse session from storage with session_id: {}, user_id: {}", key.second, key.first);
LOG_TRACE(log, "Reuse session from storage with session_id: {}, user_id: {}", key.second, key.first);
if (!session.unique())
throw Exception(ErrorCodes::SESSION_IS_LOCKED, "Session {} is locked by a concurrent client", session_id);
@ -703,6 +702,10 @@ void Session::releaseSessionID()
{
if (!named_session)
return;
prepared_client_info = getClientInfo();
session_context.reset();
named_session->release();
named_session = nullptr;
}

View File

@ -8,6 +8,7 @@
#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
namespace Poco::Net { class SocketAddress; }

View File

@ -13,7 +13,8 @@ class HTTPRequestHandler : private boost::noncopyable
public:
virtual ~HTTPRequestHandler() = default;
virtual void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) = 0;
virtual void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) = 0;
virtual void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { handleRequest(request, response, ProfileEvents::end()); }
};
}

View File

@ -10,8 +10,10 @@ HTTPServer::HTTPServer(
HTTPRequestHandlerFactoryPtr factory_,
Poco::ThreadPool & thread_pool,
Poco::Net::ServerSocket & socket_,
Poco::Net::HTTPServerParams::Ptr params)
: TCPServer(new HTTPServerConnectionFactory(context, params, factory_), thread_pool, socket_, params), factory(factory_)
Poco::Net::HTTPServerParams::Ptr params,
const ProfileEvents::Event & read_event,
const ProfileEvents::Event & write_event)
: TCPServer(new HTTPServerConnectionFactory(context, params, factory_, read_event, write_event), thread_pool, socket_, params), factory(factory_)
{
}

View File

@ -20,7 +20,9 @@ public:
HTTPRequestHandlerFactoryPtr factory,
Poco::ThreadPool & thread_pool,
Poco::Net::ServerSocket & socket,
Poco::Net::HTTPServerParams::Ptr params);
Poco::Net::HTTPServerParams::Ptr params,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
~HTTPServer() override;

View File

@ -11,8 +11,10 @@ HTTPServerConnection::HTTPServerConnection(
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket,
Poco::Net::HTTPServerParams::Ptr params_,
HTTPRequestHandlerFactoryPtr factory_)
: TCPServerConnection(socket), context(std::move(context_)), tcp_server(tcp_server_), params(params_), factory(factory_), stopped(false)
HTTPRequestHandlerFactoryPtr factory_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: TCPServerConnection(socket), context(std::move(context_)), tcp_server(tcp_server_), params(params_), factory(factory_), read_event(read_event_), write_event(write_event_), stopped(false)
{
poco_check_ptr(factory);
}
@ -30,7 +32,7 @@ void HTTPServerConnection::run()
if (!stopped && tcp_server.isOpen() && session.connected())
{
HTTPServerResponse response(session);
HTTPServerRequest request(context, response, session);
HTTPServerRequest request(context, response, session, read_event);
Poco::Timestamp now;
@ -65,7 +67,7 @@ void HTTPServerConnection::run()
if (request.getExpectContinue() && response.getStatus() == Poco::Net::HTTPResponse::HTTP_OK)
response.sendContinue();
handler->handleRequest(request, response);
handler->handleRequest(request, response, write_event);
session.setKeepAlive(params->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive());
}
else

View File

@ -19,7 +19,9 @@ public:
TCPServer & tcp_server,
const Poco::Net::StreamSocket & socket,
Poco::Net::HTTPServerParams::Ptr params,
HTTPRequestHandlerFactoryPtr factory);
HTTPRequestHandlerFactoryPtr factory,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
HTTPServerConnection(
HTTPContextPtr context_,
@ -27,8 +29,10 @@ public:
const Poco::Net::StreamSocket & socket_,
Poco::Net::HTTPServerParams::Ptr params_,
HTTPRequestHandlerFactoryPtr factory_,
const String & forwarded_for_)
: HTTPServerConnection(context_, tcp_server_, socket_, params_, factory_)
const String & forwarded_for_,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end())
: HTTPServerConnection(context_, tcp_server_, socket_, params_, factory_, read_event_, write_event_)
{
forwarded_for = forwarded_for_;
}
@ -44,6 +48,8 @@ private:
Poco::Net::HTTPServerParams::Ptr params;
HTTPRequestHandlerFactoryPtr factory;
String forwarded_for;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
bool stopped;
std::mutex mutex; // guards the |factory| with assumption that creating handlers is not thread-safe.
};

View File

@ -5,20 +5,20 @@
namespace DB
{
HTTPServerConnectionFactory::HTTPServerConnectionFactory(
HTTPContextPtr context_, Poco::Net::HTTPServerParams::Ptr params_, HTTPRequestHandlerFactoryPtr factory_)
: context(std::move(context_)), params(params_), factory(factory_)
HTTPContextPtr context_, Poco::Net::HTTPServerParams::Ptr params_, HTTPRequestHandlerFactoryPtr factory_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
: context(std::move(context_)), params(params_), factory(factory_), read_event(read_event_), write_event(write_event_)
{
poco_check_ptr(factory);
}
Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server)
{
return new HTTPServerConnection(context, tcp_server, socket, params, factory);
return new HTTPServerConnection(context, tcp_server, socket, params, factory, read_event, write_event);
}
Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server, TCPProtocolStackData & stack_data)
{
return new HTTPServerConnection(context, tcp_server, socket, params, factory, stack_data.forwarded_for);
return new HTTPServerConnection(context, tcp_server, socket, params, factory, stack_data.forwarded_for, read_event, write_event);
}
}

View File

@ -12,7 +12,7 @@ namespace DB
class HTTPServerConnectionFactory : public TCPServerConnectionFactory
{
public:
HTTPServerConnectionFactory(HTTPContextPtr context, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory);
HTTPServerConnectionFactory(HTTPContextPtr context, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override;
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server, TCPProtocolStackData & stack_data) override;
@ -21,6 +21,8 @@ private:
HTTPContextPtr context;
Poco::Net::HTTPServerParams::Ptr params;
HTTPRequestHandlerFactoryPtr factory;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
};
}

View File

@ -22,7 +22,7 @@
namespace DB
{
HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session)
HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session, const ProfileEvents::Event & read_event)
: max_uri_size(context->getMaxUriSize())
, max_fields_number(context->getMaxFields())
, max_field_name_size(context->getMaxFieldNameSize())
@ -41,7 +41,7 @@ HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse
session.socket().setReceiveTimeout(receive_timeout);
session.socket().setSendTimeout(send_timeout);
auto in = std::make_unique<ReadBufferFromPocoSocket>(session.socket());
auto in = std::make_unique<ReadBufferFromPocoSocket>(session.socket(), read_event);
socket = session.socket().impl();
readRequest(*in); /// Try parse according to RFC7230

View File

@ -4,6 +4,7 @@
#include <IO/ReadBuffer.h>
#include <Server/HTTP/HTTPRequest.h>
#include <Server/HTTP/HTTPContext.h>
#include <Common/ProfileEvents.h>
#include "config.h"
#include <Poco/Net/HTTPServerSession.h>
@ -19,7 +20,7 @@ class ReadBufferFromPocoSocket;
class HTTPServerRequest : public HTTPRequest
{
public:
HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session);
HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session, const ProfileEvents::Event & read_event = ProfileEvents::end());
/// FIXME: it's a little bit inconvenient interface. The rationale is that all other ReadBuffer's wrap each other
/// via unique_ptr - but we can't inherit HTTPServerRequest from ReadBuffer and pass it around,

View File

@ -9,12 +9,15 @@
#include <Poco/Net/HTTPHeaderStream.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/StreamCopier.h>
#include <sstream>
namespace DB
{
HTTPServerResponse::HTTPServerResponse(Poco::Net::HTTPServerSession & session_) : session(session_)
HTTPServerResponse::HTTPServerResponse(Poco::Net::HTTPServerSession & session_, const ProfileEvents::Event & write_event_)
: session(session_)
, write_event(write_event_)
{
}
@ -24,42 +27,45 @@ void HTTPServerResponse::sendContinue()
hs << getVersion() << " 100 Continue\r\n\r\n";
}
std::shared_ptr<std::ostream> HTTPServerResponse::send()
std::shared_ptr<WriteBufferFromPocoSocket> HTTPServerResponse::send()
{
poco_assert(!stream);
if ((request && request->getMethod() == HTTPRequest::HTTP_HEAD) || getStatus() < 200 || getStatus() == HTTPResponse::HTTP_NO_CONTENT
|| getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
{
Poco::CountingOutputStream cs;
write(cs);
stream = std::make_shared<Poco::Net::HTTPFixedLengthOutputStream>(session, cs.chars());
write(*stream);
// Send header
Poco::Net::HTTPHeaderOutputStream hs(session);
write(hs);
stream = std::make_shared<WriteBufferFromPocoSocket>(session.socket(), write_event);
}
else if (getChunkedTransferEncoding())
{
// Send header
Poco::Net::HTTPHeaderOutputStream hs(session);
write(hs);
stream = std::make_shared<Poco::Net::HTTPChunkedOutputStream>(session);
stream = std::make_shared<HTTPWriteBufferChunked>(session.socket(), write_event);
}
else if (hasContentLength())
{
Poco::CountingOutputStream cs;
write(cs);
stream = std::make_shared<Poco::Net::HTTPFixedLengthOutputStream>(session, getContentLength64() + cs.chars());
write(*stream);
// Send header
Poco::Net::HTTPHeaderOutputStream hs(session);
write(hs);
stream = std::make_shared<HTTPWriteBufferFixedLength>(session.socket(), getContentLength(), write_event);
}
else
{
stream = std::make_shared<Poco::Net::HTTPOutputStream>(session);
setKeepAlive(false);
write(*stream);
// Send header
Poco::Net::HTTPHeaderOutputStream hs(session);
write(hs);
stream = std::make_shared<WriteBufferFromPocoSocket>(session.socket(), write_event);
}
return stream;
}
std::pair<std::shared_ptr<std::ostream>, std::shared_ptr<std::ostream>> HTTPServerResponse::beginSend()
std::pair<std::shared_ptr<WriteBufferFromPocoSocket>, std::shared_ptr<WriteBufferFromPocoSocket>> HTTPServerResponse::beginSend()
{
poco_assert(!stream);
poco_assert(!header_stream);
@ -71,40 +77,39 @@ std::pair<std::shared_ptr<std::ostream>, std::shared_ptr<std::ostream>> HTTPServ
{
throw Poco::Exception("HTTPServerResponse::beginSend is invalid for HEAD request");
}
else if (getChunkedTransferEncoding())
{
header_stream = std::make_shared<Poco::Net::HTTPHeaderOutputStream>(session);
beginWrite(*header_stream);
stream = std::make_shared<Poco::Net::HTTPChunkedOutputStream>(session);
}
else if (hasContentLength())
if (hasContentLength())
{
throw Poco::Exception("HTTPServerResponse::beginSend is invalid for response with Content-Length header");
}
// Write header to buffer
std::stringstream header; //STYLE_CHECK_ALLOW_STD_STRING_STREAM
beginWrite(header);
// Send header
auto str = header.str();
header_stream = std::make_shared<WriteBufferFromPocoSocket>(session.socket(), write_event, str.size());
header_stream->write(str);
if (getChunkedTransferEncoding())
stream = std::make_shared<HTTPWriteBufferChunked>(session.socket(), write_event);
else
{
stream = std::make_shared<Poco::Net::HTTPOutputStream>(session);
header_stream = stream;
setKeepAlive(false);
beginWrite(*stream);
}
stream = std::make_shared<WriteBufferFromPocoSocket>(session.socket(), write_event);
return std::make_pair(header_stream, stream);
}
void HTTPServerResponse::sendBuffer(const void * buffer, std::size_t length)
{
poco_assert(!stream);
setContentLength(static_cast<int>(length));
setChunkedTransferEncoding(false);
// Send header
Poco::Net::HTTPHeaderOutputStream hs(session);
write(hs);
hs.flush();
stream = std::make_shared<Poco::Net::HTTPHeaderOutputStream>(session);
write(*stream);
if (request && request->getMethod() != HTTPRequest::HTTP_HEAD)
{
stream->write(static_cast<const char *>(buffer), static_cast<std::streamsize>(length));
}
WriteBufferFromPocoSocket(session.socket(), write_event).write(static_cast<const char *>(buffer), length);
}
void HTTPServerResponse::requireAuthentication(const std::string & realm)

View File

@ -1,9 +1,12 @@
#pragma once
#include <IO/WriteBufferFromPocoSocket.h>
#include <Server/HTTP/HTTPResponse.h>
#include <Poco/Net/HTTPServerSession.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/StreamSocket.h>
#include <Poco/NumberFormatter.h>
#include <memory>
@ -11,12 +14,182 @@
namespace DB
{
class HTTPWriteBufferChunked : public WriteBufferFromPocoSocket
{
using WriteBufferFromPocoSocket::WriteBufferFromPocoSocket;
protected:
void nextImpl() override
{
if (offset() == 0)
return;
std::string chunk_header;
Poco::NumberFormatter::appendHex(chunk_header, offset());
chunk_header.append("\r\n", 2);
socketSendBytes(chunk_header.data(), static_cast<int>(chunk_header.size()));
WriteBufferFromPocoSocket::nextImpl();
socketSendBytes("\r\n", 2);
}
void finalizeImpl() override
{
WriteBufferFromPocoSocket::finalizeImpl();
socketSendBytes("0\r\n\r\n", 5);
}
};
class HTTPWriteBufferFixedLength : public WriteBufferFromPocoSocket
{
public:
explicit HTTPWriteBufferFixedLength(Poco::Net::Socket & socket_, size_t fixed_length_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: WriteBufferFromPocoSocket(socket_, buf_size)
{
fixed_length = fixed_length_;
}
explicit HTTPWriteBufferFixedLength(Poco::Net::Socket & socket_, size_t fixed_length_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: WriteBufferFromPocoSocket(socket_, write_event_, buf_size)
{
fixed_length = fixed_length_;
}
protected:
void nextImpl() override
{
if (count_length >= fixed_length || offset() == 0)
return;
if (count_length + offset() > fixed_length)
pos -= offset() - (fixed_length - count_length);
count_length += offset();
WriteBufferFromPocoSocket::nextImpl();
}
private:
size_t fixed_length;
size_t count_length = 0;
};
/// Universal HTTP buffer, can be switched for different Transfer-Encoding/Content-Length on the fly
/// so it can be used to output HTTP header and then switched to appropriate mode for body
class HTTPWriteBuffer : public WriteBufferFromPocoSocket
{
public:
explicit HTTPWriteBuffer(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: WriteBufferFromPocoSocket(socket_, buf_size)
{
}
explicit HTTPWriteBuffer(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: WriteBufferFromPocoSocket(socket_, write_event_, buf_size)
{
}
void setChunked(size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
{
chunked = true;
resizeIfNeeded(buf_size);
}
bool isChunked()
{
return chunked;
}
void setFixedLength(size_t length)
{
chunked = false;
fixed_length = length;
count_length = 0;
resizeIfNeeded(length);
}
size_t isFixedLength()
{
return chunked ? 0 : fixed_length;
}
void setPlain(size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
{
chunked = false;
fixed_length = 0;
count_length = 0;
resizeIfNeeded(buf_size);
}
bool isPlain()
{
return !(isChunked() || isFixedLength());
}
protected:
void finalizeImpl() override
{
WriteBufferFromPocoSocket::finalizeImpl();
if (chunked)
socketSendBytes("0\r\n\r\n", 5);
}
void nextImpl() override
{
if (chunked)
return nextImplChunked();
if (fixed_length)
return nextImplFixedLength();
WriteBufferFromPocoSocket::nextImpl();
}
void nextImplFixedLength()
{
if (count_length >= fixed_length || offset() == 0)
return;
if (count_length + offset() > fixed_length)
pos -= offset() - (fixed_length - count_length);
count_length += offset();
WriteBufferFromPocoSocket::nextImpl();
}
void nextImplChunked()
{
if (offset() == 0)
return;
std::string chunk_header;
Poco::NumberFormatter::appendHex(chunk_header, offset());
chunk_header.append("\r\n", 2);
socketSendBytes(chunk_header.data(), static_cast<int>(chunk_header.size()));
WriteBufferFromPocoSocket::nextImpl();
socketSendBytes("\r\n", 2);
}
void resizeIfNeeded(size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
{
if (!buf_size)
return;
auto data_size = offset();
assert(data_size <= buf_size);
memory.resize(buf_size);
set(memory.data(), memory.size(), data_size);
}
private:
bool chunked = false;
size_t fixed_length = 0;
size_t count_length = 0;
};
class HTTPServerRequest;
class HTTPServerResponse : public HTTPResponse
{
public:
explicit HTTPServerResponse(Poco::Net::HTTPServerSession & session);
explicit HTTPServerResponse(Poco::Net::HTTPServerSession & session, const ProfileEvents::Event & write_event_ = ProfileEvents::end());
void sendContinue(); /// Sends a 100 Continue response to the client.
@ -26,7 +199,7 @@ public:
///
/// Must not be called after beginSend(), sendFile(), sendBuffer()
/// or redirect() has been called.
std::shared_ptr<std::ostream> send(); /// TODO: use some WriteBuffer implementation here.
std::shared_ptr<WriteBufferFromPocoSocket> send();
/// Sends the response headers to the client
/// but do not finish headers with \r\n,
@ -34,7 +207,7 @@ public:
///
/// Must not be called after send(), sendFile(), sendBuffer()
/// or redirect() has been called.
std::pair<std::shared_ptr<std::ostream>, std::shared_ptr<std::ostream>> beginSend(); /// TODO: use some WriteBuffer implementation here.
std::pair<std::shared_ptr<WriteBufferFromPocoSocket>, std::shared_ptr<WriteBufferFromPocoSocket>> beginSend();
/// Sends the response header to the client, followed
/// by the contents of the given buffer.
@ -58,13 +231,16 @@ public:
/// Returns true if the response (header) has been sent.
bool sent() const { return !!stream; }
Poco::Net::StreamSocket & getSocket() { return session.socket(); }
void attachRequest(HTTPServerRequest * request_) { request = request_; }
private:
Poco::Net::HTTPServerSession & session;
HTTPServerRequest * request = nullptr;
std::shared_ptr<std::ostream> stream;
std::shared_ptr<std::ostream> header_stream;
ProfileEvents::Event write_event;
std::shared_ptr<WriteBufferFromPocoSocket> stream;
std::shared_ptr<WriteBufferFromPocoSocket> header_stream;
};
}

View File

@ -1,17 +1,15 @@
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <memory>
#include <sstream>
#include <string>
namespace DB
{
namespace ErrorCodes
{
}
void WriteBufferFromHTTPServerResponse::startSendHeaders()
{
@ -19,27 +17,33 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
{
headers_started_sending = true;
if (response.getChunkedTransferEncoding())
setChunked();
if (add_cors_header)
response.set("Access-Control-Allow-Origin", "*");
setResponseDefaultHeaders(response, keep_alive_timeout);
if (!is_http_method_head)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
std::stringstream header; //STYLE_CHECK_ALLOW_STD_STRING_STREAM
response.beginWrite(header);
auto header_str = header.str();
socketSendBytes(header_str.data(), header_str.size());
}
}
void WriteBufferFromHTTPServerResponse::writeHeaderProgressImpl(const char * header_name)
{
if (headers_finished_sending)
if (is_http_method_head || headers_finished_sending || !headers_started_sending)
return;
WriteBufferFromOwnString progress_string_writer;
accumulated_progress.writeJSON(progress_string_writer);
if (response_header_ostr)
*response_header_ostr << header_name << progress_string_writer.str() << "\r\n" << std::flush;
socketSendBytes(header_name, strlen(header_name));
socketSendBytes(progress_string_writer.str().data(), progress_string_writer.str().size());
socketSendBytes("\r\n", 2);
}
void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
@ -57,30 +61,30 @@ void WriteBufferFromHTTPServerResponse::writeExceptionCode()
{
if (headers_finished_sending || !exception_code)
return;
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Exception-Code: " << exception_code << "\r\n" << std::flush;
if (headers_started_sending)
{
socketSendBytes("X-ClickHouse-Exception-Code: ", sizeof("X-ClickHouse-Exception-Code: ") - 1);
auto str_code = std::to_string(exception_code);
socketSendBytes(str_code.data(), str_code.size());
socketSendBytes("\r\n", 2);
}
}
void WriteBufferFromHTTPServerResponse::finishSendHeaders()
{
if (!headers_finished_sending)
{
writeHeaderSummary();
writeExceptionCode();
headers_finished_sending = true;
if (headers_finished_sending)
return;
if (!is_http_method_head)
{
/// Send end of headers delimiter.
if (response_header_ostr)
*response_header_ostr << "\r\n" << std::flush;
}
else
{
if (!response_body_ostr)
response_body_ostr = response.send();
}
}
if (!headers_started_sending)
startSendHeaders();
writeHeaderSummary();
writeExceptionCode();
headers_finished_sending = true;
/// Send end of headers delimiter.
socketSendBytes("\r\n", 2);
}
@ -89,47 +93,19 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
if (!initialized)
{
std::lock_guard lock(mutex);
/// Initialize as early as possible since if the code throws,
/// next() should not be called anymore.
initialized = true;
if (compression_method != CompressionMethod::None)
response.set("Content-Encoding", toContentEncodingName(compression_method));
startSendHeaders();
if (!out && !is_http_method_head)
{
if (compress)
{
auto content_encoding_name = toContentEncodingName(compression_method);
*response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n";
}
/// We reuse our buffer in "out" to avoid extra allocations and copies.
if (compress)
out = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromOStream>(*response_body_ostr),
compress ? compression_method : CompressionMethod::None,
compression_level,
working_buffer.size(),
working_buffer.begin());
else
out = std::make_unique<WriteBufferFromOStream>(
*response_body_ostr,
working_buffer.size(),
working_buffer.begin());
}
finishSendHeaders();
}
if (out)
{
out->buffer() = buffer();
out->position() = position();
out->next();
}
if (!is_http_method_head)
HTTPWriteBuffer::nextImpl();
}
@ -137,14 +113,11 @@ WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_,
bool is_http_method_head_,
UInt64 keep_alive_timeout_,
bool compress_,
CompressionMethod compression_method_)
: BufferWithOwnMemory<WriteBuffer>(DBMS_DEFAULT_BUFFER_SIZE)
const ProfileEvents::Event & write_event_)
: HTTPWriteBuffer(response_.getSocket(), write_event_)
, response(response_)
, is_http_method_head(is_http_method_head_)
, keep_alive_timeout(keep_alive_timeout_)
, compress(compress_)
, compression_method(compression_method_)
{
}
@ -169,6 +142,15 @@ void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
}
}
void WriteBufferFromHTTPServerResponse::setExceptionCode(int exception_code_)
{
std::lock_guard lock(mutex);
if (headers_started_sending)
exception_code = exception_code_;
else
response.set("X-ClickHouse-Exception-Code", toString<int>(exception_code_));
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
finalize();
@ -176,30 +158,20 @@ WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
void WriteBufferFromHTTPServerResponse::finalizeImpl()
{
try
if (!headers_finished_sending)
{
next();
if (out)
out->finalize();
out.reset();
/// Catch write-after-finalize bugs.
set(nullptr, 0);
}
catch (...)
{
/// Avoid calling WriteBufferFromOStream::next() from dtor
/// (via WriteBufferFromHTTPServerResponse::next())
out.reset();
throw;
}
if (!offset())
{
/// If no remaining data, just send headers.
std::lock_guard lock(mutex);
/// If no body data just send header
startSendHeaders();
if (!initialized && offset() && compression_method != CompressionMethod::None)
socketSendStr("Content-Encoding: " + toContentEncodingName(compression_method) + "\r\n");
finishSendHeaders();
}
if (!is_http_method_head)
HTTPWriteBuffer::finalizeImpl();
}

View File

@ -5,8 +5,8 @@
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Poco/Net/StreamSocket.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
@ -17,48 +17,26 @@
namespace DB
{
/// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream
/// (using response.send()) only after data is flushed for the first time. This is needed in HTTP
/// servers to change some HTTP headers (e.g. response code) before any data is sent to the client
/// (headers can't be changed after response.send() is called).
///
/// In short, it allows delaying the call to response.send().
///
/// Additionally, supports HTTP response compression (in this case corresponding Content-Encoding
/// header will be set).
/// Postpone sending HTTP header until first data is flushed. This is needed in HTTP servers
/// to change some HTTP headers (e.g. response code) before any data is sent to the client.
///
/// Also this class write and flush special X-ClickHouse-Progress HTTP headers
/// if no data was sent at the time of progress notification.
/// This allows to implement progress bar in HTTP clients.
class WriteBufferFromHTTPServerResponse final : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromHTTPServerResponse final : public HTTPWriteBuffer
{
public:
WriteBufferFromHTTPServerResponse(
HTTPServerResponse & response_,
bool is_http_method_head_,
UInt64 keep_alive_timeout_,
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
CompressionMethod compression_method_ = CompressionMethod::None);
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
~WriteBufferFromHTTPServerResponse() override;
/// Writes progress in repeating HTTP headers.
void onProgress(const Progress & progress);
/// Turn compression on or off.
/// The setting has any effect only if HTTP headers haven't been sent yet.
void setCompression(bool enable_compression)
{
compress = enable_compression;
}
/// Set compression level if the compression is turned on.
/// The setting has any effect only if HTTP headers haven't been sent yet.
void setCompressionLevel(int level)
{
compression_level = level;
}
/// Turn CORS on or off.
/// The setting has any effect only if HTTP headers haven't been sent yet.
void addHeaderCORS(bool enable_cors)
@ -75,7 +53,13 @@ public:
send_progress_interval_ms = send_progress_interval_ms_;
}
void setExceptionCode(int exception_code_) { exception_code = exception_code_; }
/// Content-Encoding header will be set on first data package
void setCompressionMethodHeader(const CompressionMethod & compression_method_)
{
compression_method = compression_method_;
}
void setExceptionCode(int exception_code_);
private:
/// Send at least HTTP headers if no data has been sent yet.
@ -108,14 +92,7 @@ private:
bool is_http_method_head;
bool add_cors_header = false;
size_t keep_alive_timeout = 0;
bool compress = false;
CompressionMethod compression_method;
int compression_level = 1;
std::shared_ptr<std::ostream> response_body_ostr;
std::shared_ptr<std::ostream> response_header_ostr;
std::unique_ptr<WriteBuffer> out;
bool initialized = false;
bool headers_started_sending = false;
@ -126,6 +103,8 @@ private:
size_t send_progress_interval_ms = 100;
Stopwatch progress_watch;
CompressionMethod compression_method = CompressionMethod::None;
int exception_code = 0;
std::mutex mutex; /// progress callback could be called from different threads.

View File

@ -46,7 +46,9 @@
#include <Poco/String.h>
#include <Poco/Net/SocketAddress.h>
#include <algorithm>
#include <chrono>
#include <memory>
#include <sstream>
#ifdef __clang__
@ -301,7 +303,7 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
std::vector<WriteBufferPtr> write_buffers;
ConcatReadBuffer::Buffers read_buffers;
auto * cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed.get());
auto * cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed);
if (!cascade_buffer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected CascadeWriteBuffer");
@ -553,7 +555,8 @@ void HTTPHandler::processQuery(
HTMLForm & params,
HTTPServerResponse & response,
Output & used_output,
std::optional<CurrentThread::QueryScope> & query_scope)
std::optional<CurrentThread::QueryScope> & query_scope,
const ProfileEvents::Event & write_event)
{
using namespace Poco::Net;
@ -564,6 +567,9 @@ void HTTPHandler::processQuery(
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
SCOPE_EXIT({ session->releaseSessionID(); });
String session_id;
std::chrono::steady_clock::duration session_timeout;
bool session_is_set = params.has("session_id");
@ -616,15 +622,35 @@ void HTTPHandler::processQuery(
size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE;
size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0;
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
response,
request.getMethod() == HTTPRequest::HTTP_HEAD,
context->getServerSettings().keep_alive_timeout.totalSeconds(),
client_supports_http_compression,
http_response_compression_method);
bool enable_http_compression = params.getParsed<bool>("enable_http_compression", context->getSettingsRef().enable_http_compression);
Int64 http_zlib_compression_level = params.getParsed<Int64>("http_zlib_compression_level", context->getSettingsRef().http_zlib_compression_level);
used_output.out_holder =
std::make_shared<WriteBufferFromHTTPServerResponse>(
response,
request.getMethod() == HTTPRequest::HTTP_HEAD,
context->getServerSettings().keep_alive_timeout.totalSeconds(),
write_event);
used_output.out = used_output.out_holder;
used_output.out_maybe_compressed = used_output.out_holder;
if (client_supports_http_compression && enable_http_compression)
{
used_output.out_holder->setCompressionMethodHeader(http_response_compression_method);
used_output.wrap_compressed_holder =
wrapWriteBufferWithCompressionMethod(
used_output.out_holder.get(),
http_response_compression_method,
static_cast<int>(http_zlib_compression_level),
DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0, false);
used_output.out = used_output.wrap_compressed_holder;
}
if (internal_compression)
used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out);
{
used_output.out_compressed_holder = std::make_shared<CompressedWriteBuffer>(*used_output.out);
used_output.out_maybe_compressed = used_output.out_compressed_holder;
}
else
used_output.out_maybe_compressed = used_output.out;
@ -664,12 +690,12 @@ void HTTPHandler::processQuery(
cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
}
used_output.out_maybe_delayed_and_compressed = std::make_shared<CascadeWriteBuffer>(
std::move(cascade_buffer1), std::move(cascade_buffer2));
used_output.out_delayed_and_compressed_holder = std::make_unique<CascadeWriteBuffer>(std::move(cascade_buffer1), std::move(cascade_buffer2));
used_output.out_maybe_delayed_and_compressed = used_output.out_delayed_and_compressed_holder.get();
}
else
{
used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed;
used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed.get();
}
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
@ -798,14 +824,8 @@ void HTTPHandler::processQuery(
const auto & query = getQuery(request, params, context);
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query);
/// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
used_output.out->setCompression(client_supports_http_compression && settings.enable_http_compression);
if (client_supports_http_compression)
used_output.out->setCompressionLevel(static_cast<int>(settings.http_zlib_compression_level));
used_output.out->setSendProgress(settings.send_progress_in_http_headers);
used_output.out->setSendProgressInterval(settings.http_headers_progress_interval_ms);
used_output.out_holder->setSendProgress(settings.send_progress_in_http_headers);
used_output.out_holder->setSendProgressInterval(settings.http_headers_progress_interval_ms);
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
@ -816,7 +836,7 @@ void HTTPHandler::processQuery(
/// Note that whether the header is added is determined by the settings, and we can only get the user settings after authentication.
/// Once the authentication fails, the header can't be added.
if (settings.add_http_cors_header && !request.get("Origin", "").empty() && !config.has("http_options_response"))
used_output.out->addHeaderCORS(true);
used_output.out_holder->addHeaderCORS(true);
auto append_callback = [my_context = context] (ProgressCallback callback)
{
@ -835,7 +855,7 @@ void HTTPHandler::processQuery(
/// Note that we add it unconditionally so the progress is available for `X-ClickHouse-Summary`
append_callback([&used_output](const Progress & progress)
{
used_output.out->onProgress(progress);
used_output.out_holder->onProgress(progress);
});
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
@ -888,6 +908,8 @@ void HTTPHandler::processQuery(
{},
handle_exception_in_output_format);
session->releaseSessionID();
if (used_output.hasDelayed())
{
/// TODO: set Content-Length if possible
@ -902,10 +924,8 @@ void HTTPHandler::trySendExceptionToClient(
const std::string & s, int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output)
try
{
/// In case data has already been sent, like progress headers, try using the output buffer to
/// set the exception code since it will be able to append it if it hasn't finished writing headers
if (response.sent() && used_output.out)
used_output.out->setExceptionCode(exception_code);
if (used_output.out_holder)
used_output.out_holder->setExceptionCode(exception_code);
else
response.set("X-ClickHouse-Exception-Code", toString<int>(exception_code));
@ -930,10 +950,10 @@ try
response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code));
}
if (!response.sent() && !used_output.out_maybe_compressed && !used_output.exception_is_written)
if (!used_output.out_holder && !used_output.exception_is_written)
{
/// If nothing was sent yet and we don't even know if we must compress the response.
*response.send() << s << std::endl;
WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD, DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT).writeln(s);
}
else if (used_output.out_maybe_compressed)
{
@ -943,7 +963,8 @@ try
/// do not call finalize here for CascadeWriteBuffer used_output.out_maybe_delayed_and_compressed,
/// exception is written into used_output.out_maybe_compressed later
/// HTTPHandler::trySendExceptionToClient is called with exception context, it is Ok to destroy buffers
used_output.out_maybe_delayed_and_compressed.reset();
used_output.out_delayed_and_compressed_holder.reset();
used_output.out_maybe_delayed_and_compressed = nullptr;
}
if (!used_output.exception_is_written)
@ -953,12 +974,12 @@ try
/// Also HTTP code 200 could have already been sent.
/// If buffer has data, and that data wasn't sent yet, then no need to send that data
bool data_sent = used_output.out->count() != used_output.out->offset();
bool data_sent = used_output.out_holder->count() != used_output.out_holder->offset();
if (!data_sent)
{
used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin();
used_output.out->position() = used_output.out->buffer().begin();
used_output.out_holder->position() = used_output.out_holder->buffer().begin();
}
writeString(s, *used_output.out_maybe_compressed);
@ -989,7 +1010,7 @@ catch (...)
}
void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
setThreadName("HTTPHandler");
ThreadStatus thread_status;
@ -1078,7 +1099,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
"is no Content-Length header for POST request");
}
processQuery(request, params, response, used_output, query_scope);
processQuery(request, params, response, used_output, query_scope, write_event);
if (request_credentials)
LOG_DEBUG(log, "Authentication in progress...");
else

View File

@ -6,6 +6,8 @@
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <IO/CascadeWriteBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#ifdef __clang__
# pragma clang diagnostic push
@ -40,7 +42,7 @@ public:
HTTPHandler(IServer & server_, const std::string & name, const std::optional<String> & content_type_override_);
virtual ~HTTPHandler() override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
/// This method is called right before the query execution.
virtual void customizeContext(HTTPServerRequest & /* request */, ContextMutablePtr /* context */, ReadBuffer & /* body */) {}
@ -61,11 +63,22 @@ private:
* WriteBufferFromHTTPServerResponse out
*/
std::shared_ptr<WriteBufferFromHTTPServerResponse> out;
/// Points to 'out' or to CompressedWriteBuffer(*out), depending on settings.
/// Holds original response buffer
std::shared_ptr<WriteBufferFromHTTPServerResponse> out_holder;
/// If HTTP compression is enabled holds compression wrapper over original response buffer
std::shared_ptr<WriteBuffer> wrap_compressed_holder;
/// Points either to out_holder or to wrap_compressed_holder
std::shared_ptr<WriteBuffer> out;
/// If internal compression is enabled holds compression wrapper over out buffer
std::shared_ptr<CompressedWriteBuffer> out_compressed_holder;
/// Points to 'out' or to CompressedWriteBuffer(*out)
std::shared_ptr<WriteBuffer> out_maybe_compressed;
/// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer.
std::shared_ptr<WriteBuffer> out_maybe_delayed_and_compressed;
/// If output should be delayed holds cascade buffer
std::unique_ptr<CascadeWriteBuffer> out_delayed_and_compressed_holder;
/// Points to out_maybe_compressed or to CascadeWriteBuffer.
WriteBuffer * out_maybe_delayed_and_compressed = nullptr;
bool finalized = false;
@ -73,7 +86,7 @@ private:
inline bool hasDelayed() const
{
return out_maybe_delayed_and_compressed != out_maybe_compressed;
return out_maybe_delayed_and_compressed != out_maybe_compressed.get();
}
inline void finalize()
@ -82,11 +95,9 @@ private:
return;
finalized = true;
if (out_maybe_delayed_and_compressed)
out_maybe_delayed_and_compressed->finalize();
if (out_maybe_compressed)
out_maybe_compressed->finalize();
if (out)
else if (out)
out->finalize();
}
@ -135,7 +146,8 @@ private:
HTMLForm & params,
HTTPServerResponse & response,
Output & used_output,
std::optional<CurrentThread::QueryScope> & query_scope);
std::optional<CurrentThread::QueryScope> & query_scope,
const ProfileEvents::Event & write_event);
void trySendExceptionToClient(
const std::string & s,

View File

@ -77,7 +77,7 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer
}
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
setThreadName("IntersrvHandler");
ThreadStatus thread_status;
@ -89,7 +89,7 @@ void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPSe
Output used_output;
const auto keep_alive_timeout = server.context()->getServerSettings().keep_alive_timeout.totalSeconds();
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout, write_event);
auto write_response = [&](const std::string & message)
{

View File

@ -30,7 +30,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
struct Output

View File

@ -19,7 +19,7 @@
namespace DB
{
void KeeperReadinessHandler::handleRequest(HTTPServerRequest & /*request*/, HTTPServerResponse & response)
void KeeperReadinessHandler::handleRequest(HTTPServerRequest & /*request*/, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
@ -58,7 +58,7 @@ void KeeperReadinessHandler::handleRequest(HTTPServerRequest & /*request*/, HTTP
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << std::endl;
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)

View File

@ -22,7 +22,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
HTTPRequestHandlerFactoryPtr

View File

@ -70,13 +70,17 @@ MySQLHandler::MySQLHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool ssl_enabled, uint32_t connection_id_)
bool ssl_enabled, uint32_t connection_id_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, log(&Poco::Logger::get("MySQLHandler"))
, connection_id(connection_id_)
, auth_plugin(new MySQLProtocol::Authentication::Native41())
, read_event(read_event_)
, write_event(write_event_)
{
server_capabilities = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
if (ssl_enabled)
@ -98,8 +102,8 @@ void MySQLHandler::run()
session->setClientConnectionId(connection_id);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
in = std::make_shared<ReadBufferFromPocoSocket>(socket(), read_event);
out = std::make_shared<WriteBufferFromPocoSocket>(socket(), write_event);
packet_endpoint = std::make_shared<MySQLProtocol::PacketEndpoint>(*in, *out, sequence_id);
try
@ -489,8 +493,10 @@ MySQLHandlerSSL::MySQLHandlerSSL(
bool ssl_enabled,
uint32_t connection_id_,
RSA & public_key_,
RSA & private_key_)
: MySQLHandler(server_, tcp_server_, socket_, ssl_enabled, connection_id_)
RSA & private_key_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: MySQLHandler(server_, tcp_server_, socket_, ssl_enabled, connection_id_, read_event_, write_event_)
, public_key(public_key_)
, private_key(private_key_)
{}

View File

@ -42,7 +42,9 @@ public:
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool ssl_enabled,
uint32_t connection_id_);
uint32_t connection_id_,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
void run() final;
@ -102,6 +104,9 @@ protected:
std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBuffer> out;
bool secure_connection = false;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
};
#if USE_SSL
@ -115,7 +120,9 @@ public:
bool ssl_enabled,
uint32_t connection_id_,
RSA & public_key_,
RSA & private_key_);
RSA & private_key_,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
private:
void authPluginSSL() override;

View File

@ -21,9 +21,11 @@ namespace ErrorCodes
extern const int OPENSSL_ERROR;
}
MySQLHandlerFactory::MySQLHandlerFactory(IServer & server_)
MySQLHandlerFactory::MySQLHandlerFactory(IServer & server_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
: server(server_)
, log(&Poco::Logger::get("MySQLHandlerFactory"))
, read_event(read_event_)
, write_event(write_event_)
{
#if USE_SSL
try

View File

@ -4,6 +4,7 @@
#include <memory>
#include <Server/IServer.h>
#include <Server/TCPServerConnectionFactory.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -37,8 +38,11 @@ private:
#endif
std::atomic<unsigned> last_connection_id = 0;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
public:
explicit MySQLHandlerFactory(IServer & server_);
explicit MySQLHandlerFactory(IServer & server_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
void readRSAKeys();

View File

@ -5,7 +5,7 @@
namespace DB
{
void NotFoundHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void NotFoundHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{

View File

@ -10,7 +10,7 @@ class NotFoundHandler : public HTTPRequestHandler
{
public:
NotFoundHandler(std::vector<std::string> hints_) : hints(std::move(hints_)) {}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
private:
std::vector<std::string> hints;
};

View File

@ -32,12 +32,16 @@ PostgreSQLHandler::PostgreSQLHandler(
TCPServer & tcp_server_,
bool ssl_enabled_,
Int32 connection_id_,
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_)
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, ssl_enabled(ssl_enabled_)
, connection_id(connection_id_)
, read_event(read_event_)
, write_event(write_event_)
, authentication_manager(auth_methods_)
{
changeIO(socket());
@ -45,8 +49,8 @@ PostgreSQLHandler::PostgreSQLHandler(
void PostgreSQLHandler::changeIO(Poco::Net::StreamSocket & socket)
{
in = std::make_shared<ReadBufferFromPocoSocket>(socket);
out = std::make_shared<WriteBufferFromPocoSocket>(socket);
in = std::make_shared<ReadBufferFromPocoSocket>(socket, read_event);
out = std::make_shared<WriteBufferFromPocoSocket>(socket, write_event);
message_transport = std::make_shared<PostgreSQLProtocol::Messaging::MessageTransport>(in.get(), out.get());
}

View File

@ -33,7 +33,9 @@ public:
TCPServer & tcp_server_,
bool ssl_enabled_,
Int32 connection_id_,
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_);
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> & auth_methods_,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
void run() final;
@ -51,6 +53,9 @@ private:
std::shared_ptr<WriteBuffer> out;
std::shared_ptr<PostgreSQLProtocol::Messaging::MessageTransport> message_transport;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
#if USE_SSL
std::shared_ptr<Poco::Net::SecureStreamSocket> ss;
#endif

View File

@ -5,9 +5,11 @@
namespace DB
{
PostgreSQLHandlerFactory::PostgreSQLHandlerFactory(IServer & server_)
PostgreSQLHandlerFactory::PostgreSQLHandlerFactory(IServer & server_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
: server(server_)
, log(&Poco::Logger::get("PostgreSQLHandlerFactory"))
, read_event(read_event_)
, write_event(write_event_)
{
auth_methods =
{
@ -20,7 +22,7 @@ Poco::Net::TCPServerConnection * PostgreSQLHandlerFactory::createConnection(cons
{
Int32 connection_id = last_connection_id++;
LOG_TRACE(log, "PostgreSQL connection. Id: {}. Address: {}", connection_id, socket.peerAddress().toString());
return new PostgreSQLHandler(socket, server, tcp_server, ssl_enabled, connection_id, auth_methods);
return new PostgreSQLHandler(socket, server, tcp_server, ssl_enabled, connection_id, auth_methods, read_event, write_event);
}
}

View File

@ -15,6 +15,8 @@ class PostgreSQLHandlerFactory : public TCPServerConnectionFactory
private:
IServer & server;
Poco::Logger * log;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
#if USE_SSL
bool ssl_enabled = true;
@ -26,7 +28,7 @@ private:
std::vector<std::shared_ptr<PostgreSQLProtocol::PGAuthentication::AuthenticationMethod>> auth_methods;
public:
explicit PostgreSQLHandlerFactory(IServer & server_);
explicit PostgreSQLHandlerFactory(IServer & server_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & server) override;
};

View File

@ -13,7 +13,7 @@
namespace DB
{
void PrometheusRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void PrometheusRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event)
{
try
{
@ -27,7 +27,7 @@ void PrometheusRequestHandler::handleRequest(HTTPServerRequest & request, HTTPSe
response.setContentType("text/plain; version=0.0.4; charset=UTF-8");
WriteBufferFromHTTPServerResponse wb(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
WriteBufferFromHTTPServerResponse wb(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout, write_event);
try
{
metrics_writer.write(wb);

View File

@ -22,7 +22,7 @@ public:
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
}

View File

@ -22,7 +22,7 @@ ReplicasStatusHandler::ReplicasStatusHandler(IServer & server) : WithContext(ser
{
}
void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
try
{
@ -113,7 +113,7 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
*response.send() << getCurrentExceptionMessage(false) << std::endl;
*response.send() << getCurrentExceptionMessage(false) << '\n';
}
}
catch (...)

View File

@ -14,7 +14,7 @@ class ReplicasStatusHandler : public HTTPRequestHandler, WithContext
public:
explicit ReplicasStatusHandler(IServer & server_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};

View File

@ -33,9 +33,11 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}
static inline WriteBufferPtr
static inline std::unique_ptr<WriteBuffer>
responseWriteBuffer(HTTPServerRequest & request, HTTPServerResponse & response, UInt64 keep_alive_timeout)
{
auto buf = std::unique_ptr<WriteBuffer>(new WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD, keep_alive_timeout));
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
CompressionMethod http_response_compression_method = CompressionMethod::None;
@ -43,14 +45,11 @@ responseWriteBuffer(HTTPServerRequest & request, HTTPServerResponse & response,
if (!http_response_compression_methods.empty())
http_response_compression_method = chooseHTTPCompressionMethod(http_response_compression_methods);
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;
if (http_response_compression_method == CompressionMethod::None)
return buf;
return std::make_shared<WriteBufferFromHTTPServerResponse>(
response,
request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD,
keep_alive_timeout,
client_supports_http_compression,
http_response_compression_method);
response.set("Content-Encoding", toContentEncodingName(http_response_compression_method));
return wrapWriteBufferWithCompressionMethod(std::move(buf), http_response_compression_method, 1);
}
static inline void trySendExceptionToClient(
@ -69,7 +68,7 @@ static inline void trySendExceptionToClient(
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << s << std::endl;
*response.send() << s << '\n';
else
{
if (out.count() != out.offset())
@ -88,10 +87,10 @@ static inline void trySendExceptionToClient(
}
}
void StaticRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void StaticRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
auto keep_alive_timeout = server.context()->getServerSettings().keep_alive_timeout.totalSeconds();
const auto & out = responseWriteBuffer(request, response, keep_alive_timeout);
auto out = responseWriteBuffer(request, response, keep_alive_timeout);
try
{

View File

@ -29,7 +29,7 @@ public:
void writeResponse(WriteBuffer & out);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
}

View File

@ -184,23 +184,27 @@ void validateClientInfo(const ClientInfo & session_client_info, const ClientInfo
namespace DB
{
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_)
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, parse_proxy_protocol(parse_proxy_protocol_)
, log(&Poco::Logger::get("TCPHandler"))
, read_event(read_event_)
, write_event(write_event_)
, server_display_name(std::move(server_display_name_))
{
}
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_)
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, log(&Poco::Logger::get("TCPHandler"))
, forwarded_for(stack_data.forwarded_for)
, certificate(stack_data.certificate)
, read_event(read_event_)
, write_event(write_event_)
, default_database(stack_data.default_database)
, server_display_name(std::move(server_display_name_))
{
@ -233,8 +237,8 @@ void TCPHandler::runImpl()
socket().setSendTimeout(send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
in = std::make_shared<ReadBufferFromPocoSocket>(socket(), read_event);
out = std::make_shared<WriteBufferFromPocoSocket>(socket(), write_event);
/// Support for PROXY protocol
if (parse_proxy_protocol && !receiveProxyHeader())

View File

@ -147,8 +147,8 @@ public:
* because it allows to check the IP ranges of the trusted proxy.
* Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP.
*/
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_);
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_);
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
~TCPHandler() override;
void run() override;
@ -191,6 +191,9 @@ private:
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
/// Time after the last check to stop the request and send the progress.
Stopwatch after_check_cancelled;
Stopwatch after_send_progress;

View File

@ -21,6 +21,9 @@ private:
Poco::Logger * log;
std::string server_display_name;
ProfileEvents::Event read_event;
ProfileEvents::Event write_event;
class DummyTCPHandler : public Poco::Net::TCPServerConnection
{
public:
@ -33,9 +36,11 @@ public:
* and set the information about forwarded address accordingly.
* See https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt
*/
TCPHandlerFactory(IServer & server_, bool secure_, bool parse_proxy_protocol_)
TCPHandlerFactory(IServer & server_, bool secure_, bool parse_proxy_protocol_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end())
: server(server_), parse_proxy_protocol(parse_proxy_protocol_)
, log(&Poco::Logger::get(std::string("TCP") + (secure_ ? "S" : "") + "HandlerFactory"))
, read_event(read_event_)
, write_event(write_event_)
{
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
}
@ -45,8 +50,7 @@ public:
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPHandler(server, tcp_server, socket, parse_proxy_protocol, server_display_name);
return new TCPHandler(server, tcp_server, socket, parse_proxy_protocol, server_display_name, read_event, write_event);
}
catch (const Poco::Net::NetException &)
{
@ -60,8 +64,7 @@ public:
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPHandler(server, tcp_server, socket, stack_data, server_display_name);
return new TCPHandler(server, tcp_server, socket, stack_data, server_display_name, read_event, write_event);
}
catch (const Poco::Net::NetException &)
{

View File

@ -1,5 +1,6 @@
#include "WebUIRequestHandler.h"
#include "IServer.h"
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
@ -36,7 +37,7 @@ WebUIRequestHandler::WebUIRequestHandler(IServer & server_)
}
void WebUIRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void WebUIRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & /*write_event*/)
{
auto keep_alive_timeout = server.context()->getServerSettings().keep_alive_timeout.totalSeconds();
@ -50,7 +51,7 @@ void WebUIRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerR
if (request.getURI().starts_with("/play"))
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_OK);
*response.send() << std::string_view(reinterpret_cast<const char *>(gresource_play_htmlData), gresource_play_htmlSize);
WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD, keep_alive_timeout).write(reinterpret_cast<const char *>(gresource_play_htmlData), gresource_play_htmlSize);
}
else if (request.getURI().starts_with("/dashboard"))
{
@ -66,17 +67,17 @@ void WebUIRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerR
static re2::RE2 uplot_url = R"(https://[^\s"'`]+u[Pp]lot[^\s"'`]*\.js)";
RE2::Replace(&html, uplot_url, "/js/uplot.js");
*response.send() << html;
WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD, keep_alive_timeout).write(html);
}
else if (request.getURI().starts_with("/binary"))
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_OK);
*response.send() << std::string_view(reinterpret_cast<const char *>(gresource_binary_htmlData), gresource_binary_htmlSize);
WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD, keep_alive_timeout).write(reinterpret_cast<const char *>(gresource_binary_htmlData), gresource_binary_htmlSize);
}
else if (request.getURI() == "/js/uplot.js")
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_OK);
*response.send() << std::string_view(reinterpret_cast<const char *>(gresource_uplot_jsData), gresource_uplot_jsSize);
WriteBufferFromHTTPServerResponse(response, request.getMethod() == HTTPRequest::HTTP_HEAD, keep_alive_timeout).write(reinterpret_cast<const char *>(gresource_uplot_jsData), gresource_uplot_jsSize);
}
else
{

View File

@ -16,7 +16,7 @@ private:
public:
WebUIRequestHandler(IServer & server_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response, const ProfileEvents::Event & write_event) override;
};
}

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CURL_OUTPUT=$(echo 'SELECT 1 + sleepEachRow(0.00002) FROM numbers(100000)' | \
${CLICKHOUSE_CURL_COMMAND} -v "${CLICKHOUSE_URL}&wait_end_of_query=1&send_progress_in_http_headers=0&max_execution_time=1" --data-binary @- 2>&1)
${CLICKHOUSE_CURL_COMMAND} -vsS "${CLICKHOUSE_URL}&wait_end_of_query=1&send_progress_in_http_headers=0&max_execution_time=1" --data-binary @- 2>&1)
READ_ROWS=$(echo "${CURL_OUTPUT}" | \
grep 'X-ClickHouse-Summary' | \

View File

@ -5,5 +5,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
echo 'SELECT 1 FROM numbers(100)' |
${CLICKHOUSE_CURL_COMMAND} -v "${CLICKHOUSE_URL}&wait_end_of_query=1&send_progress_in_http_headers=0" --data-binary @- 2>&1 |
${CLICKHOUSE_CURL_COMMAND} -vsS "${CLICKHOUSE_URL}&wait_end_of_query=1&send_progress_in_http_headers=0" --data-binary @- 2>&1 |
grep 'X-ClickHouse-Summary' | sed 's/,\"elapsed_ns[^}]*//'