mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Try to fix "query in form data" feature while still tracking limits for external tables (try №2) #2482
This commit is contained in:
parent
47ea011f27
commit
f1ba2f9a33
@ -444,17 +444,14 @@ void HTTPHandler::processQuery(
|
||||
return false;
|
||||
};
|
||||
|
||||
/// Used in case of POST request with form-data, but it not to be expectd to be deleted after that scope
|
||||
/// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope.
|
||||
std::string full_query;
|
||||
|
||||
/// Support for "external data for query processing".
|
||||
if (startsWith(request.getContentType().data(), "multipart/form-data"))
|
||||
{
|
||||
context.setExternalTablesInitializer([¶ms, &request, &istr] (Context & context_query)
|
||||
{
|
||||
ExternalTablesHandler handler(context_query, params);
|
||||
params.load(request, istr, handler);
|
||||
});
|
||||
ExternalTablesHandler handler(context, params);
|
||||
params.load(request, istr, handler);
|
||||
|
||||
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
|
||||
reserved_param_suffixes.emplace_back("_format");
|
||||
@ -463,12 +460,9 @@ void HTTPHandler::processQuery(
|
||||
|
||||
/// Params are of both form params POST and uri (GET params)
|
||||
for (const auto & it : params)
|
||||
{
|
||||
if (it.first == "query")
|
||||
{
|
||||
full_query += it.second;
|
||||
}
|
||||
}
|
||||
|
||||
in = std::make_unique<ReadBufferFromString>(full_query);
|
||||
}
|
||||
else
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <IO/copyData.h>
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/LimitReadBuffer.h>
|
||||
#include <Storages/StorageMemory.h>
|
||||
#include <Client/Connection.h>
|
||||
#include <Poco/Net/HTMLForm.h>
|
||||
@ -45,7 +46,7 @@ public:
|
||||
/// Initialize read_buffer, depending on the data source. By default, does nothing.
|
||||
virtual void initReadBuffer() {}
|
||||
|
||||
/// Get the table data - a pair (a thread with the contents of the table, the name of the table)
|
||||
/// Get the table data - a pair (a stream with the contents of the table, the name of the table)
|
||||
ExternalTableData getData(const Context & context)
|
||||
{
|
||||
initReadBuffer();
|
||||
@ -168,13 +169,21 @@ public:
|
||||
class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable
|
||||
{
|
||||
public:
|
||||
|
||||
ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) { }
|
||||
|
||||
void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream)
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
/// The buffer is initialized here, not in the virtual function initReadBuffer
|
||||
read_buffer = std::make_unique<ReadBufferFromIStream>(stream);
|
||||
read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream);
|
||||
|
||||
if (settings.http_max_multipart_form_data_size)
|
||||
read_buffer = std::make_unique<LimitReadBuffer>(
|
||||
*read_buffer_impl, settings.http_max_multipart_form_data_size,
|
||||
true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
|
||||
else
|
||||
read_buffer = std::move(read_buffer_impl);
|
||||
|
||||
/// Retrieve a collection of parameters from MessageHeader
|
||||
Poco::Net::NameValueCollection content;
|
||||
@ -199,7 +208,7 @@ public:
|
||||
StoragePtr storage = StorageMemory::create(data.second, ColumnsDescription{columns});
|
||||
storage->startup();
|
||||
context.addExternalTable(data.second, storage);
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), context.getSettingsRef());
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), settings);
|
||||
|
||||
/// Write data
|
||||
data.first->readPrefix();
|
||||
@ -216,6 +225,7 @@ public:
|
||||
private:
|
||||
Context & context;
|
||||
const Poco::Net::NameValueCollection & params;
|
||||
std::unique_ptr<ReadBufferFromIStream> read_buffer_impl;
|
||||
};
|
||||
|
||||
|
||||
|
@ -30,7 +30,7 @@ StatusFile::StatusFile(const std::string & path_)
|
||||
std::string contents;
|
||||
{
|
||||
ReadBufferFromFile in(path, 1024);
|
||||
LimitReadBuffer limit_in(in, 1024);
|
||||
LimitReadBuffer limit_in(in, 1024, false);
|
||||
readStringUntilEOF(contents, limit_in);
|
||||
}
|
||||
|
||||
|
@ -1,15 +1,30 @@
|
||||
#include <IO/LimitReadBuffer.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
|
||||
bool LimitReadBuffer::nextImpl()
|
||||
{
|
||||
/// Let underlying buffer calculate read bytes in `next()` call.
|
||||
in.position() = position();
|
||||
|
||||
if (bytes >= limit || !in.next())
|
||||
if (bytes >= limit)
|
||||
{
|
||||
if (throw_exception)
|
||||
throw Exception("Limit for LimitReadBuffer exceeded: " + exception_message, ErrorCodes::LIMIT_EXCEEDED);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!in.next())
|
||||
return false;
|
||||
|
||||
working_buffer = in.buffer();
|
||||
@ -21,8 +36,8 @@ bool LimitReadBuffer::nextImpl()
|
||||
}
|
||||
|
||||
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, size_t limit_)
|
||||
: ReadBuffer(in_.position(), 0), in(in_), limit(limit_)
|
||||
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in, size_t limit, bool throw_exception, std::string exception_message)
|
||||
: ReadBuffer(in.position(), 0), in(in), limit(limit), throw_exception(throw_exception), exception_message(std::move(exception_message))
|
||||
{
|
||||
size_t remaining_bytes_in_buffer = in.buffer().end() - in.position();
|
||||
if (remaining_bytes_in_buffer > limit)
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
#include <Core/Types.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
|
||||
|
||||
@ -13,12 +13,14 @@ class LimitReadBuffer : public ReadBuffer
|
||||
{
|
||||
private:
|
||||
ReadBuffer & in;
|
||||
size_t limit;
|
||||
UInt64 limit;
|
||||
bool throw_exception;
|
||||
std::string exception_message;
|
||||
|
||||
bool nextImpl() override;
|
||||
|
||||
public:
|
||||
LimitReadBuffer(ReadBuffer & in_, size_t limit_);
|
||||
LimitReadBuffer(ReadBuffer & in, UInt64 limit, bool throw_exception, std::string exception_message = {});
|
||||
~LimitReadBuffer() override;
|
||||
};
|
||||
|
||||
|
@ -24,13 +24,13 @@ int main(int argc, char ** argv)
|
||||
|
||||
writeCString("--- first ---\n", out);
|
||||
{
|
||||
LimitReadBuffer limit_in(in, limit);
|
||||
LimitReadBuffer limit_in(in, limit, false);
|
||||
copyData(limit_in, out);
|
||||
}
|
||||
|
||||
writeCString("\n--- second ---\n", out);
|
||||
{
|
||||
LimitReadBuffer limit_in(in, limit);
|
||||
LimitReadBuffer limit_in(in, limit, false);
|
||||
copyData(limit_in, out);
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ try
|
||||
|
||||
ReadBuffer in(&src[0], src.size(), 0);
|
||||
|
||||
LimitReadBuffer limit_in(in, 1);
|
||||
LimitReadBuffer limit_in(in, 1, false);
|
||||
|
||||
{
|
||||
WriteBufferFromString out(dst);
|
||||
@ -57,7 +57,7 @@ try
|
||||
char x;
|
||||
readChar(x, in);
|
||||
|
||||
LimitReadBuffer limit_in(in, 1);
|
||||
LimitReadBuffer limit_in(in, 1, false);
|
||||
|
||||
copyData(limit_in, out);
|
||||
|
||||
@ -102,7 +102,7 @@ try
|
||||
ReadBuffer in(&src[0], src.size(), 0);
|
||||
|
||||
{
|
||||
LimitReadBuffer limit_in(in, 1);
|
||||
LimitReadBuffer limit_in(in, 1, false);
|
||||
|
||||
char x;
|
||||
readChar(x, limit_in);
|
||||
|
@ -282,6 +282,7 @@ struct Settings
|
||||
M(SettingBool, prefer_localhost_replica, 1, "1 - always send query to local replica, if it exists. 0 - choose replica to send query between local and remote ones according to load_balancing") \
|
||||
M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.") \
|
||||
M(SettingBool, asterisk_left_columns_only, 0, "If it is set to true, the asterisk only return left of join query.") \
|
||||
M(SettingUInt64, http_max_multipart_form_data_size, 1024 * 1024 * 1024, "Limit on size of multipart/form-data content. Note that content is parsed and external tables are created in memory before start of query execution. And this is the only limit that has effect on that stage (limits on max memory usage and max execution time have no effect while reading HTTP form data).") \
|
||||
|
||||
|
||||
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
|
||||
|
Loading…
Reference in New Issue
Block a user