mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 12:14:18 +00:00
fix multipart requests
This commit is contained in:
parent
592af6d652
commit
e660c0838c
@ -51,7 +51,7 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
|
||||
std::string contents;
|
||||
{
|
||||
ReadBufferFromFile in(path, 1024);
|
||||
LimitReadBuffer limit_in(in, 1024, /* trow_exception */ false, /* exact_limit */ false);
|
||||
LimitReadBuffer limit_in(in, 1024, /* trow_exception */ false, /* exact_limit */ {});
|
||||
readStringUntilEOF(contents, limit_in);
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
|
||||
{
|
||||
initReadBuffer();
|
||||
initSampleBlock();
|
||||
auto input = context->getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
|
||||
auto input = context->getInputFormat(format, *read_buffer, sample_block, context->getSettingsRef().get("max_block_size").get<UInt64>());
|
||||
|
||||
auto data = std::make_unique<ExternalTableData>();
|
||||
data->pipe = std::make_unique<QueryPipelineBuilder>();
|
||||
@ -135,7 +135,9 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
|
||||
if (settings.http_max_multipart_form_data_size)
|
||||
read_buffer = std::make_unique<LimitReadBuffer>(
|
||||
stream, settings.http_max_multipart_form_data_size,
|
||||
true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
|
||||
/* trow_exception */ true, /* exact_limit */ std::optional<size_t>(),
|
||||
"the maximum size of multipart/form-data. "
|
||||
"This limit can be tuned by 'http_max_multipart_form_data_size' setting");
|
||||
else
|
||||
read_buffer = wrapReadBufferReference(stream);
|
||||
|
||||
|
@ -33,13 +33,13 @@ void IMySQLReadPacket::readPayloadWithUnpacked(ReadBuffer & in)
|
||||
|
||||
void LimitedReadPacket::readPayload(ReadBuffer &in, uint8_t &sequence_id)
|
||||
{
|
||||
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ false, "too long MySQL packet.");
|
||||
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
|
||||
IMySQLReadPacket::readPayload(limited, sequence_id);
|
||||
}
|
||||
|
||||
void LimitedReadPacket::readPayloadWithUnpacked(ReadBuffer & in)
|
||||
{
|
||||
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ false, "too long MySQL packet.");
|
||||
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
|
||||
IMySQLReadPacket::readPayloadWithUnpacked(limited);
|
||||
}
|
||||
|
||||
|
@ -22,11 +22,11 @@ bool LimitReadBuffer::nextImpl()
|
||||
|
||||
if (bytes >= limit)
|
||||
{
|
||||
if (exact_limit && bytes == limit)
|
||||
if (exact_limit && bytes == *exact_limit)
|
||||
return false;
|
||||
|
||||
if (exact_limit && bytes != limit)
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected data, got {} bytes, expected {}", bytes, limit);
|
||||
if (exact_limit && bytes != *exact_limit)
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected data, got {} bytes, expected {}", bytes, *exact_limit);
|
||||
|
||||
if (throw_exception)
|
||||
throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Limit for LimitReadBuffer exceeded: {}", exception_message);
|
||||
@ -36,8 +36,8 @@ bool LimitReadBuffer::nextImpl()
|
||||
|
||||
if (!in->next())
|
||||
{
|
||||
if (exact_limit && bytes != limit)
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, got {} of {} bytes", bytes, limit);
|
||||
if (exact_limit && bytes != *exact_limit)
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, got {} of {} bytes", bytes, *exact_limit);
|
||||
/// Clearing the buffer with existing data.
|
||||
set(in->position(), 0);
|
||||
return false;
|
||||
@ -52,7 +52,8 @@ bool LimitReadBuffer::nextImpl()
|
||||
}
|
||||
|
||||
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, bool exact_limit_, std::string exception_message_)
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_,
|
||||
std::optional<size_t> exact_limit_, std::string exception_message_)
|
||||
: ReadBuffer(in_ ? in_->position() : nullptr, 0)
|
||||
, in(in_)
|
||||
, owns_in(owns)
|
||||
@ -71,13 +72,15 @@ LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, boo
|
||||
}
|
||||
|
||||
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, bool exact_limit_, std::string exception_message_)
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_,
|
||||
std::optional<size_t> exact_limit_, std::string exception_message_)
|
||||
: LimitReadBuffer(&in_, false, limit_, throw_exception_, exact_limit_, exception_message_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, bool exact_limit_, std::string exception_message_)
|
||||
LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_,
|
||||
std::optional<size_t> exact_limit_, std::string exception_message_)
|
||||
: LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exact_limit_, exception_message_)
|
||||
{
|
||||
}
|
||||
|
@ -13,8 +13,9 @@ namespace DB
|
||||
class LimitReadBuffer : public ReadBuffer
|
||||
{
|
||||
public:
|
||||
LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, bool exact_limit_, std::string exception_message_ = {});
|
||||
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, bool exact_limit_,
|
||||
LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_,
|
||||
std::optional<size_t> exact_limit_, std::string exception_message_ = {});
|
||||
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, std::optional<size_t> exact_limit_,
|
||||
std::string exception_message_ = {});
|
||||
~LimitReadBuffer() override;
|
||||
|
||||
@ -24,10 +25,10 @@ private:
|
||||
|
||||
UInt64 limit;
|
||||
bool throw_exception;
|
||||
bool exact_limit;
|
||||
std::optional<size_t> exact_limit;
|
||||
std::string exception_message;
|
||||
|
||||
LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, bool exact_limit_, std::string exception_message_);
|
||||
LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::optional<size_t> exact_limit_, std::string exception_message_);
|
||||
|
||||
bool nextImpl() override;
|
||||
};
|
||||
|
@ -24,13 +24,13 @@ int main(int argc, char ** argv)
|
||||
|
||||
writeCString("--- first ---\n", out);
|
||||
{
|
||||
LimitReadBuffer limit_in(in, limit, false);
|
||||
LimitReadBuffer limit_in(in, limit, /* trow_exception */ false, /* exact_limit */ {});
|
||||
copyData(limit_in, out);
|
||||
}
|
||||
|
||||
writeCString("\n--- second ---\n", out);
|
||||
{
|
||||
LimitReadBuffer limit_in(in, limit, false);
|
||||
LimitReadBuffer limit_in(in, limit, /* trow_exception */ false, /* exact_limit */ {});
|
||||
copyData(limit_in, out);
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ try
|
||||
|
||||
ReadBuffer in(src.data(), src.size(), 0);
|
||||
|
||||
LimitReadBuffer limit_in(in, 1, false);
|
||||
LimitReadBuffer limit_in(in, 1, /* trow_exception */ false, /* exact_limit */ {});
|
||||
|
||||
{
|
||||
WriteBufferFromString out(dst);
|
||||
@ -55,7 +55,7 @@ try
|
||||
char x;
|
||||
readChar(x, in);
|
||||
|
||||
LimitReadBuffer limit_in(in, 1, false);
|
||||
LimitReadBuffer limit_in(in, 1, /* trow_exception */ false, /* exact_limit */ {});
|
||||
|
||||
copyData(limit_in, out);
|
||||
|
||||
@ -85,7 +85,7 @@ try
|
||||
ReadBuffer in(src.data(), src.size(), 0);
|
||||
|
||||
{
|
||||
LimitReadBuffer limit_in(in, 1, false);
|
||||
LimitReadBuffer limit_in(in, 1, /* trow_exception */ false, /* exact_limit */ {});
|
||||
|
||||
char x;
|
||||
readChar(x, limit_in);
|
||||
|
@ -1237,7 +1237,7 @@ void executeQuery(
|
||||
|
||||
/// If not - copy enough data into 'parse_buf'.
|
||||
WriteBufferFromVector<PODArray<char>> out(parse_buf);
|
||||
LimitReadBuffer limit(istr, max_query_size + 1, /* trow_exception */ false, /* exact_limit */ false);
|
||||
LimitReadBuffer limit(istr, max_query_size + 1, /* trow_exception */ false, /* exact_limit */ {});
|
||||
copyData(limit, out);
|
||||
out.finalize();
|
||||
|
||||
|
@ -20,6 +20,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -229,6 +234,11 @@ void HTMLForm::readMultipart(ReadBuffer & in_, PartHandler & handler)
|
||||
if (!in.skipToNextBoundary())
|
||||
break;
|
||||
}
|
||||
|
||||
/// It's important to check, because we could get "fake" EOF and incomplete request if a client suddenly died in the middle.
|
||||
if (!in.isActualEOF())
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, "
|
||||
"did not find the last boundary while parsing a multipart HTTP request");
|
||||
}
|
||||
|
||||
|
||||
@ -244,7 +254,8 @@ bool HTMLForm::MultipartReadBuffer::skipToNextBoundary()
|
||||
if (in.eof())
|
||||
return false;
|
||||
|
||||
assert(boundary_hit);
|
||||
chassert(boundary_hit);
|
||||
chassert(!found_last_boundary);
|
||||
|
||||
boundary_hit = false;
|
||||
|
||||
@ -255,7 +266,8 @@ bool HTMLForm::MultipartReadBuffer::skipToNextBoundary()
|
||||
{
|
||||
set(in.position(), 0);
|
||||
next(); /// We need to restrict our buffer to size of next available line.
|
||||
return !startsWith(line, boundary + "--");
|
||||
found_last_boundary = startsWith(line, boundary + "--");
|
||||
return !found_last_boundary;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -108,10 +108,13 @@ public:
|
||||
/// Returns false if last boundary found.
|
||||
bool skipToNextBoundary();
|
||||
|
||||
bool isActualEOF() const { return found_last_boundary; }
|
||||
|
||||
private:
|
||||
PeekableReadBuffer in;
|
||||
const std::string boundary;
|
||||
bool boundary_hit = true;
|
||||
bool found_last_boundary = false;
|
||||
|
||||
std::string readLine(bool append_crlf);
|
||||
|
||||
|
@ -56,12 +56,17 @@ HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse
|
||||
if (getChunkedTransferEncoding())
|
||||
stream = std::make_unique<HTTPChunkedReadBuffer>(std::move(in), context->getMaxChunkSize());
|
||||
else if (hasContentLength())
|
||||
stream = std::make_unique<LimitReadBuffer>(std::move(in), getContentLength(), /* trow_exception */ true, /* exact_limit */ true);
|
||||
{
|
||||
size_t content_length = getContentLength();
|
||||
stream = std::make_unique<LimitReadBuffer>(std::move(in), content_length,
|
||||
/* trow_exception */ true, /* exact_limit */ content_length);
|
||||
}
|
||||
else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE)
|
||||
{
|
||||
stream = std::move(in);
|
||||
LOG_WARNING(&Poco::Logger::get("HTTPServerRequest"), "Got an HTTP request with no content length, "
|
||||
"it may be impossible to distinguish graceful EOF from abnormal connection loss");
|
||||
if (!startsWith(getContentType(), "multipart/form-data"))
|
||||
LOG_WARNING(&Poco::Logger::get("HTTPServerRequest"), "Got an HTTP request with no content length "
|
||||
"and no chunked/multipart encoding, it may be impossible to distinguish graceful EOF from abnormal connection loss");
|
||||
}
|
||||
else
|
||||
/// We have to distinguish empty buffer and nullptr.
|
||||
|
@ -155,7 +155,7 @@ void MySQLHandler::run()
|
||||
payload.readStrict(command);
|
||||
|
||||
// For commands which are executed without MemoryTracker.
|
||||
LimitReadBuffer limited_payload(payload, 10000, /* trow_exception */ true, /* exact_limit */ false, "too long MySQL packet.");
|
||||
LimitReadBuffer limited_payload(payload, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
|
||||
|
||||
LOG_DEBUG(log, "Received command: {}. Connection id: {}.",
|
||||
static_cast<int>(static_cast<unsigned char>(command)), connection_id);
|
||||
|
@ -1057,7 +1057,7 @@ bool TCPHandler::receiveProxyHeader()
|
||||
/// Only PROXYv1 is supported.
|
||||
/// Validation of protocol is not fully performed.
|
||||
|
||||
LimitReadBuffer limit_in(*in, 107, /* trow_exception */ true, /* exact_limit */ false); /// Maximum length from the specs.
|
||||
LimitReadBuffer limit_in(*in, 107, /* trow_exception */ true, /* exact_limit */ {}); /// Maximum length from the specs.
|
||||
|
||||
assertString("PROXY ", limit_in);
|
||||
|
||||
|
@ -118,7 +118,7 @@ private:
|
||||
|
||||
if (limited_by_file_size)
|
||||
{
|
||||
limited.emplace(*plain, file_size - offset, /* trow_exception */ false, /* exact_limit */ false);
|
||||
limited.emplace(*plain, file_size - offset, /* trow_exception */ false, /* exact_limit */ std::optional<size_t>());
|
||||
compressed.emplace(*limited);
|
||||
}
|
||||
else
|
||||
|
@ -10,24 +10,32 @@ export TEST_MARK="02435_insert_${CLICKHOUSE_DATABASE}_"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q 'select * from numbers(5000000) format TSV' > $DATA_FILE
|
||||
$CLICKHOUSE_CLIENT -q 'create table dedup_test(A Int64) Engine = MergeTree order by A settings non_replicated_deduplication_window=1000;'
|
||||
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&max_insert_block_size=100000&input_format_parallel_parsing=0&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
|
||||
|
||||
function insert_data
|
||||
{
|
||||
SETTINGS="query_id=$ID&max_insert_block_size=100000&input_format_parallel_parsing=0"
|
||||
TRASH_SETTINGS="query_id=$ID&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_insert_block_size=1100000&max_block_size=1100000&min_insert_block_size_bytes=0&min_insert_block_size_rows=1100000&max_insert_block_size=1100000"
|
||||
TYPE=$(( RANDOM % 3 ))
|
||||
if [[ "$TYPE" -eq 0 ]]; then
|
||||
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
|
||||
elif [[ "$TYPE" -eq 1 ]]; then
|
||||
$CLICKHOUSE_CURL -sS -X POST -H "Transfer-Encoding: chunked" --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
|
||||
else
|
||||
$CLICKHOUSE_CURL -sS -F 'file=@-' "$CLICKHOUSE_URL&$TRASH_SETTINGS&file_format=TSV&file_types=UInt64" -X POST --form-string 'query=insert into dedup_test select * from file' < $DATA_FILE
|
||||
fi
|
||||
}
|
||||
|
||||
export -f insert_data
|
||||
|
||||
insert_data
|
||||
$CLICKHOUSE_CLIENT -q 'select count() from dedup_test'
|
||||
|
||||
function thread_insert
|
||||
{
|
||||
# supress "Killed" messages from bash
|
||||
function wrap
|
||||
{
|
||||
if (( RANDOM % 2 )); then
|
||||
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&query_id=$ID&max_insert_block_size=100000&input_format_parallel_parsing=0&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
|
||||
else
|
||||
$CLICKHOUSE_CURL -sS -X POST -H "Transfer-Encoding: chunked" --data-binary @- "$CLICKHOUSE_URL&query_id=$ID&max_insert_block_size=100000&input_format_parallel_parsing=0&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
|
||||
fi
|
||||
}
|
||||
export -f wrap
|
||||
while true; do
|
||||
export ID="$TEST_MARK$RANDOM"
|
||||
bash -c wrap 2>&1| grep -Fav "Killed"
|
||||
bash -c insert_data 2>&1| grep -Fav "Killed"
|
||||
done
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user