LimitReadBuffer

This commit is contained in:
Sema Checherinda 2024-10-01 16:38:14 +02:00
parent e0f083ac0b
commit cea797e254
19 changed files with 128 additions and 102 deletions

View File

@ -94,8 +94,7 @@ void LocalConnection::sendProfileEvents()
Block profile_block; Block profile_block;
state->after_send_profile_events.restart(); state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents; next_packet_type = Protocol::Server::ProfileEvents;
ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, profile_block, last_sent_snapshots); state->block.emplace(ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, last_sent_snapshots));
state->block.emplace(std::move(profile_block));
} }
void LocalConnection::sendQuery( void LocalConnection::sendQuery(

View File

@ -51,7 +51,7 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
std::string contents; std::string contents;
{ {
ReadBufferFromFile in(path, 1024); ReadBufferFromFile in(path, 1024);
LimitReadBuffer limit_in(in, 1024, /* throw_exception */ false, /* exact_limit */ {}); LimitReadBuffer limit_in(in, {.read_no_more = 1024});
readStringUntilEOF(contents, limit_in); readStringUntilEOF(contents, limit_in);
} }

View File

@ -189,11 +189,11 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
if (settings[Setting::http_max_multipart_form_data_size]) if (settings[Setting::http_max_multipart_form_data_size])
read_buffer = std::make_unique<LimitReadBuffer>( read_buffer = std::make_unique<LimitReadBuffer>(
stream, stream,
settings[Setting::http_max_multipart_form_data_size], LimitReadBuffer::Settings{
/* trow_exception */ true, .read_no_more = settings[Setting::http_max_multipart_form_data_size],
/* exact_limit */ std::optional<size_t>(), .expect_eof = true,
"the maximum size of multipart/form-data. " .excetion_hint = "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting",
"This limit can be tuned by 'http_max_multipart_form_data_size' setting"); });
else else
read_buffer = wrapReadBufferReference(stream); read_buffer = wrapReadBufferReference(stream);

View File

@ -33,13 +33,15 @@ void IMySQLReadPacket::readPayloadWithUnpacked(ReadBuffer & in)
void LimitedReadPacket::readPayload(ReadBuffer &in, uint8_t &sequence_id) void LimitedReadPacket::readPayload(ReadBuffer &in, uint8_t &sequence_id)
{ {
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet."); LimitReadBuffer limited(in, {.read_no_more = 10000, .expect_eof = true, .excetion_hint = "too long MySQL packet."});
//10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
IMySQLReadPacket::readPayload(limited, sequence_id); IMySQLReadPacket::readPayload(limited, sequence_id);
} }
void LimitedReadPacket::readPayloadWithUnpacked(ReadBuffer & in) void LimitedReadPacket::readPayloadWithUnpacked(ReadBuffer & in)
{ {
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet."); LimitReadBuffer limited(in,{.read_no_more = 10000, .expect_eof = true, .excetion_hint = "too long MySQL packet."});
// 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
IMySQLReadPacket::readPayloadWithUnpacked(limited); IMySQLReadPacket::readPayloadWithUnpacked(limited);
} }

View File

@ -429,7 +429,8 @@ bool BinlogFromFile::tryReadEvent(BinlogEventPtr & to, UInt64 /*ms*/)
EventHeader event_header; EventHeader event_header;
event_header.parse(*in); event_header.parse(*in);
LimitReadBuffer limit_read_buffer(*in, event_header.event_size - EVENT_HEADER_LENGTH, /* throw_exception */ false, /* exact_limit */ {}); //LimitReadBuffer limit_read_buffer(*in, event_header.event_size - EVENT_HEADER_LENGTH, /* throw_exception */ false, /* exact_limit */ {});
LimitReadBuffer limit_read_buffer(*in, {.read_no_more = event_header.event_size - EVENT_HEADER_LENGTH});
MySQLBinlogEventReadBuffer event_payload(limit_read_buffer, checksum_signature_length); MySQLBinlogEventReadBuffer event_payload(limit_read_buffer, checksum_signature_length);
parseEvent(event_header, event_payload); parseEvent(event_header, event_payload);
to = event; to = event;

View File

@ -1,5 +1,8 @@
#include <limits>
#include <IO/LimitReadBuffer.h> #include <IO/LimitReadBuffer.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Core/Settings.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
@ -19,63 +22,50 @@ bool LimitReadBuffer::nextImpl()
/// Let underlying buffer calculate read bytes in `next()` call. /// Let underlying buffer calculate read bytes in `next()` call.
in->position() = position(); in->position() = position();
if (bytes >= limit) if (bytes >= settings.read_no_less)
{ {
if (exact_limit && bytes == *exact_limit) if (settings.expect_eof && bytes > settings.read_no_more)
throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Limit for LimitReadBuffer exceeded: {}", settings.excetion_hint);
if (bytes >= settings.read_no_more)
return false; return false;
if (exact_limit && bytes != *exact_limit) //throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected data, got {} bytes, expected {}", bytes, settings.read_atmost);
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected data, got {} bytes, expected {}", bytes, *exact_limit);
if (throw_exception)
throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Limit for LimitReadBuffer exceeded: {}", exception_message);
return false;
} }
if (!in->next()) if (!in->next())
{ {
if (exact_limit && bytes != *exact_limit) if (bytes < settings.read_no_less)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, got {} of {} bytes", bytes, *exact_limit); throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, got {} of {} bytes", bytes, settings.read_no_less);
/// Clearing the buffer with existing data. /// Clearing the buffer with existing data.
BufferBase::set(in->position(), 0, 0); BufferBase::set(in->position(), 0, 0);
return false; return false;
} }
BufferBase::set(in->position(), std::min(in->available(), limit - bytes), 0); BufferBase::set(in->position(), std::min(in->available(), getEffectiveBufferSize() - bytes), 0);
return true; return true;
} }
LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, size_t limit_, bool throw_exception_, LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, Settings settings_)
std::optional<size_t> exact_limit_, std::string exception_message_) : ReadBuffer(in_.position(), 0)
: ReadBuffer(in_ ? in_->position() : nullptr, 0) , in(&in_)
, in(in_) , settings(std::move(settings_))
, owns_in(owns)
, limit(limit_)
, throw_exception(throw_exception_)
, exact_limit(exact_limit_)
, exception_message(std::move(exception_message_))
{ {
chassert(in); chassert(in);
chassert(settings.read_no_less <= settings.read_no_more);
BufferBase::set(in->position(), std::min(in->available(), limit), 0); BufferBase::set(in->position(), std::min(in->available(), getEffectiveBufferSize()), 0);
} }
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, size_t limit_, bool throw_exception_, LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, Settings settings_)
std::optional<size_t> exact_limit_, std::string exception_message_) : LimitReadBuffer(*in_, std::move(settings_))
: LimitReadBuffer(&in_, false, limit_, throw_exception_, exact_limit_, exception_message_)
{
}
LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t limit_, bool throw_exception_,
std::optional<size_t> exact_limit_, std::string exception_message_)
: LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exact_limit_, exception_message_)
{ {
holder = std::move(in_);
} }
@ -84,9 +74,12 @@ LimitReadBuffer::~LimitReadBuffer()
/// Update underlying buffer's position in case when limit wasn't reached. /// Update underlying buffer's position in case when limit wasn't reached.
if (!working_buffer.empty()) if (!working_buffer.empty())
in->position() = position(); in->position() = position();
if (owns_in)
delete in;
} }
size_t LimitReadBuffer::getEffectiveBufferSize() const
{
if (settings.read_no_less)
return settings.read_no_less;
return settings.read_no_more;
}
} }

View File

@ -1,7 +1,10 @@
#pragma once #pragma once
#include <limits>
#include <memory>
#include <base/types.h> #include <base/types.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include "Core/Settings.h"
namespace DB namespace DB
@ -13,26 +16,29 @@ namespace DB
class LimitReadBuffer : public ReadBuffer class LimitReadBuffer : public ReadBuffer
{ {
public: public:
LimitReadBuffer(ReadBuffer & in_, size_t limit_, bool throw_exception_, struct Settings
std::optional<size_t> exact_limit_, std::string exception_message_ = {}); {
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t limit_, bool throw_exception_, std::optional<size_t> exact_limit_, size_t read_no_less = 0;
std::string exception_message_ = {}); size_t read_no_more = std::numeric_limits<size_t>::max();
bool expect_eof = false;
std::string excetion_hint = {};
};
LimitReadBuffer(ReadBuffer & in_, Settings settings);
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, Settings settings);
~LimitReadBuffer() override; ~LimitReadBuffer() override;
private: private:
ReadBuffer * in; ReadBuffer * in;
const bool owns_in; std::unique_ptr<ReadBuffer> holder;
const size_t limit; const Settings settings;
const bool throw_exception;
const std::optional<size_t> exact_limit;
const std::string exception_message;
LoggerPtr log;
LimitReadBuffer(ReadBuffer * in_, bool owns, size_t limit_, bool throw_exception_, std::optional<size_t> exact_limit_, std::string exception_message_); LimitReadBuffer(ReadBuffer * in_, bool owns, size_t limit_, bool throw_exception_, std::optional<size_t> exact_limit_, std::string exception_message_);
bool nextImpl() override; bool nextImpl() override;
size_t getEffectiveBufferSize() const;
}; };
} }

View File

@ -172,6 +172,39 @@ public:
next(); next();
} }
size_t rejectDataSafe()
{
size_t before = count();
if (count() == offset())
position() -= offset();
return before - count();
}
size_t rejectDataSafe(size_t partial_data_to_reject)
{
size_t before = count();
if (count() == offset() && offset() >= partial_data_to_reject)
position() -= partial_data_to_reject;
return before - count();
}
bool rejectDataHard()
{
size_t before = count();
position() -= offset();
return before - count();
}
size_t rejectDataHard(size_t partial_data_to_reject)
{
size_t before = count();
if (offset() >= partial_data_to_reject)
{
position() -= partial_data_to_reject;
}
return before - count();
}
protected: protected:
WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {} WriteBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) {}

View File

@ -175,7 +175,7 @@ void WriteBufferFromPocoSocketChunked::nextImpl()
LOG_TEST(log, "{} -> {} Chunk send started. Message {}, size {}", LOG_TEST(log, "{} -> {} Chunk send started. Message {}, size {}",
ourAddress().toString(), peerAddress().toString(), ourAddress().toString(), peerAddress().toString(),
static_cast<unsigned int>(*(reinterpret_cast<char *>(chunk_size_ptr) + sizeof(*chunk_size_ptr))), static_cast<unsigned int>(*(reinterpret_cast<char *>(chunk_size_ptr) + sizeof(*chunk_size_ptr))),
*chunk_size_ptr); *chunk_size_ptr);
} }
else else
LOG_TEST(log, "{} -> {} Chunk send continued. Size {}", ourAddress().toString(), peerAddress().toString(), *chunk_size_ptr); LOG_TEST(log, "{} -> {} Chunk send continued. Size {}", ourAddress().toString(), peerAddress().toString(), *chunk_size_ptr);

View File

@ -381,9 +381,10 @@ AsynchronousInsertQueue::pushQueryWithInlinedData(ASTPtr query, ContextPtr query
LimitReadBuffer limit_buf( LimitReadBuffer limit_buf(
*read_buf, *read_buf,
query_context->getSettingsRef()[Setting::async_insert_max_data_size], {.read_no_more = query_context->getSettingsRef()[Setting::async_insert_max_data_size]});
/*throw_exception=*/false, // query_context->getSettingsRef()[Setting::async_insert_max_data_size],
/*exact_limit=*/{}); // /*throw_exception=*/false,
// /*exact_limit=*/{});
{ {
WriteBufferFromString write_buf(bytes); WriteBufferFromString write_buf(bytes);

View File

@ -3,6 +3,7 @@
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include "Core/Block.h"
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
@ -98,11 +99,7 @@ static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::Mutabl
columns[i]->insert(snapshot.peak_memory_usage); columns[i]->insert(snapshot.peak_memory_usage);
} }
void getProfileEvents( DB::Block getSampleBlock()
const String & host_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots)
{ {
using namespace DB; using namespace DB;
static const NamesAndTypesList column_names_and_types = { static const NamesAndTypesList column_names_and_types = {
@ -118,8 +115,16 @@ void getProfileEvents(
for (auto const & name_and_type : column_names_and_types) for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name); temp_columns.emplace_back(name_and_type.type, name_and_type.name);
block = std::move(temp_columns); return Block(std::move(temp_columns));
MutableColumns columns = block.mutateColumns(); }
DB::Block getProfileEvents(
const String & host_name,
DB::InternalProfileEventsQueuePtr profile_queue,
ThreadIdToCountersSnapshot & last_sent_snapshots)
{
using namespace DB;
auto thread_group = CurrentThread::getGroup(); auto thread_group = CurrentThread::getGroup();
ThreadIdToCountersSnapshot new_snapshots; ThreadIdToCountersSnapshot new_snapshots;
@ -139,6 +144,9 @@ void getProfileEvents(
} }
last_sent_snapshots = std::move(new_snapshots); last_sent_snapshots = std::move(new_snapshots);
auto block = getSampleBlock();
MutableColumns columns = block.mutateColumns();
dumpProfileEvents(group_snapshot, columns, host_name); dumpProfileEvents(group_snapshot, columns, host_name);
dumpMemoryTracker(group_snapshot, columns, host_name); dumpMemoryTracker(group_snapshot, columns, host_name);
@ -154,6 +162,8 @@ void getProfileEvents(
bool empty = columns[0]->empty(); bool empty = columns[0]->empty();
if (!empty) if (!empty)
block.setColumns(std::move(columns)); block.setColumns(std::move(columns));
return block;
} }
} }

View File

@ -25,10 +25,11 @@ using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, Counters::Snapshot
/// Dumps profile events to columns Map(String, UInt64) /// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true); void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
void getProfileEvents( DB::Block getSampleBlock();
DB::Block getProfileEvents(
const String & host_name, const String & host_name,
DB::InternalProfileEventsQueuePtr profile_queue, DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots); ThreadIdToCountersSnapshot & last_sent_snapshots);
/// This is for ProfileEvents packets. /// This is for ProfileEvents packets.

View File

@ -1626,7 +1626,7 @@ void executeQuery(
/// If not - copy enough data into 'parse_buf'. /// If not - copy enough data into 'parse_buf'.
WriteBufferFromVector<PODArray<char>> out(parse_buf); WriteBufferFromVector<PODArray<char>> out(parse_buf);
LimitReadBuffer limit(istr, max_query_size + 1, /* trow_exception */ false, /* exact_limit */ {}); LimitReadBuffer limit(istr, {.read_no_more = max_query_size + 1});
copyData(limit, out); copyData(limit, out);
out.finalize(); out.finalize();

View File

@ -71,23 +71,11 @@ void BlockIO::onException()
void BlockIO::onCancelOrConnectionLoss() void BlockIO::onCancelOrConnectionLoss()
{ {
/// Query was not finished gracefully, so we should call exception_callback /// Query was not finished gracefully, so we should call exception_callback
/// But we don't have a real exception if (exception_callback)
try exception_callback(/* log_error */ false);
{
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled or a client has unexpectedly dropped the connection");
}
catch (...)
{
if (exception_callback)
{
exception_callback(/* log_error */ false);
}
/// destroy pipeline and write buffers with an exception context
pipeline.cancel();
pipeline.reset();
}
pipeline.cancel();
pipeline.reset();
} }
void BlockIO::setAllDataSent() const void BlockIO::setAllDataSent() const

View File

@ -60,8 +60,7 @@ HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse
else if (hasContentLength()) else if (hasContentLength())
{ {
size_t content_length = getContentLength(); size_t content_length = getContentLength();
stream = std::make_unique<LimitReadBuffer>(std::move(in), content_length, stream = std::make_unique<LimitReadBuffer>(std::move(in), LimitReadBuffer::Settings{.read_no_less = content_length, .read_no_more = content_length, .expect_eof = true});
/* trow_exception */ true, /* exact_limit */ content_length);
} }
else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE) else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE)
{ {

View File

@ -257,17 +257,11 @@ bool WriteBufferFromHTTPServerResponse::cancelWithException(HTTPServerRequest &
{ {
data_sent |= (compression_buffer->count() != compression_buffer->offset()); data_sent |= (compression_buffer->count() != compression_buffer->offset());
if (!data_sent) if (!data_sent)
{ compression_discarded_data = compression_buffer->rejectDataSafe();
compression_discarded_data = std::distance(compression_buffer->buffer().begin(), compression_buffer->position());
compression_buffer->position() = compression_buffer->buffer().begin();
}
} }
data_sent |= (count() != offset()); data_sent |= (count() != offset());
if (!data_sent) if (!data_sent)
{ discarded_data = rejectDataSafe();
discarded_data = std::distance(buffer().begin(), position());
position() = buffer().begin();
}
bool is_response_sent = response.sent(); bool is_response_sent = response.sent();
// proper senging bad http code // proper senging bad http code

View File

@ -275,7 +275,7 @@ void MySQLHandler::run()
payload.readStrict(command); payload.readStrict(command);
// For commands which are executed without MemoryTracker. // For commands which are executed without MemoryTracker.
LimitReadBuffer limited_payload(payload, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet."); LimitReadBuffer limited_payload(payload, {.read_no_more = 1000, .expect_eof = true, .excetion_hint = "too long MySQL packet."});
LOG_DEBUG(log, "Received command: {}. Connection id: {}.", LOG_DEBUG(log, "Received command: {}. Connection id: {}.",
static_cast<int>(static_cast<unsigned char>(command)), connection_id); static_cast<int>(static_cast<unsigned char>(command)), connection_id);

View File

@ -1361,8 +1361,7 @@ void TCPHandler::sendExtremes(const Block & extremes)
void TCPHandler::sendProfileEvents() void TCPHandler::sendProfileEvents()
{ {
Stopwatch stopwatch; Stopwatch stopwatch;
Block block; Block block = ProfileEvents::getProfileEvents(host_name, state.profile_queue, last_sent_snapshots);
ProfileEvents::getProfileEvents(host_name, state.profile_queue, block, last_sent_snapshots);
if (block.rows() != 0) if (block.rows() != 0)
{ {
initProfileEventsBlockOutput(block); initProfileEventsBlockOutput(block);
@ -1429,7 +1428,7 @@ bool TCPHandler::receiveProxyHeader()
/// Only PROXYv1 is supported. /// Only PROXYv1 is supported.
/// Validation of protocol is not fully performed. /// Validation of protocol is not fully performed.
LimitReadBuffer limit_in(*in, 107, /* trow_exception */ true, /* exact_limit */ {}); /// Maximum length from the specs. LimitReadBuffer limit_in(*in, {.read_no_more=107, .expect_eof=true}); /// Maximum length from the specs.
assertString("PROXY ", limit_in); assertString("PROXY ", limit_in);

View File

@ -134,7 +134,7 @@ private:
if (limited_by_file_size) if (limited_by_file_size)
{ {
limited.emplace(*plain, file_size - offset, /* trow_exception */ false, /* exact_limit */ std::optional<size_t>()); limited.emplace(*plain, LimitReadBuffer::Settings{.read_no_more = file_size - offset});
compressed.emplace(*limited); compressed.emplace(*limited);
} }
else else