This commit is contained in:
Vladimir Chebotarev 2016-02-12 11:53:03 +03:00
commit 0f6ad5450f
11 changed files with 298 additions and 38 deletions

View File

@ -1,6 +1,9 @@
#pragma once
#include <experimental/optional>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/DeflatingStream.h>
#include <DB/Common/Exception.h>
@ -15,6 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int LOGICAL_ERROR;
}
@ -24,21 +28,55 @@ namespace ErrorCodes
* но до вывода первых данных клиенту, можно было изменить какие-нибудь HTTP заголовки (например, код ответа).
* (После вызова Poco::Net::HTTPServerResponse::send() изменить заголовки уже нельзя.)
* То есть, суть в том, чтобы вызывать метод Poco::Net::HTTPServerResponse::send() не сразу.
*
* Дополнительно, позволяет сжимать тело HTTP-ответа, выставив соответствующий заголовок Content-Encoding.
*/
class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer>
{
private:
Poco::Net::HTTPServerResponse & response;
std::ostream * ostr = nullptr;
bool compress;
Poco::DeflatingStreamBuf::StreamType compression_method;
int compression_level = Z_DEFAULT_COMPRESSION;
std::ostream * response_ostr = nullptr; /// Сюда записывается тело HTTP ответа, возможно, сжатое.
std::experimental::optional<Poco::DeflatingOutputStream> deflating_stream;
std::ostream * ostr = nullptr; /// Куда записывать несжатое тело HTTP ответа. Указывает туда же, куда response_ostr или на deflating_stream.
void sendHeaders()
{
if (!ostr)
{
if (compress)
{
if (compression_method == Poco::DeflatingStreamBuf::STREAM_GZIP)
response.set("Content-Encoding", "gzip");
else if (compression_method == Poco::DeflatingStreamBuf::STREAM_ZLIB)
response.set("Content-Encoding", "deflate");
else
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
ErrorCodes::LOGICAL_ERROR);
response_ostr = &response.send();
deflating_stream.emplace(*response_ostr, compression_method, compression_level);
ostr = &deflating_stream.value();
}
else
{
response_ostr = &response.send();
ostr = response_ostr;
}
}
}
void nextImpl()
{
if (!ostr)
ostr = &response.send();
sendHeaders();
if (!offset())
return;
ostr->write(working_buffer.begin(), offset());
ostr->flush();
@ -47,8 +85,13 @@ private:
}
public:
WriteBufferFromHTTPServerResponse(Poco::Net::HTTPServerResponse & response_, size_t size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(size), response(response_) {}
WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerResponse & response_,
bool compress_ = false, /// Если true - выставить заголовок Content-Encoding и сжимать результат.
Poco::DeflatingStreamBuf::StreamType compression_method_ = Poco::DeflatingStreamBuf::STREAM_GZIP, /// Как сжимать результат (gzip, deflate).
size_t size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(size), response(response_),
compress(compress_), compression_method(compression_method_) {}
/** Если данные ещё не были отправлены - отправить хотя бы HTTP заголовки.
* Используйте эту функцию после того, как данные, возможно, были отправлены,
@ -56,8 +99,16 @@ public:
*/
void finalize()
{
if (!ostr)
ostr = &response.send();
sendHeaders();
}
/** Установить уровень сжатия, если данные будут сжиматься.
* Работает только перед тем, как были отправлены HTTP заголовки.
* Иначе - не имеет эффекта.
*/
void setCompressionLevel(int level)
{
compression_level = level;
}
~WriteBufferFromHTTPServerResponse()
@ -68,6 +119,9 @@ public:
try
{
next();
if (deflating_stream)
deflating_stream->close();
}
catch (...)
{

View File

@ -192,6 +192,9 @@ struct Settings
\
/** В целях тестирования exception safety - кидать исключение при каждом выделении памяти с указанной вероятностью. */ \
M(SettingFloat, memory_tracker_fault_probability, 0.) \
\
/** Уровень сжатия - используется, если клиент по HTTP сказал, что он понимает данные, сжатые методом gzip или deflate */ \
M(SettingInt64, http_zlib_compression_level, 3) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -33,23 +33,23 @@ namespace ErrorCodes
* и удалённый сервер будет использовать своё значение по-умолчанию.
*/
struct SettingUInt64
template <typename IntType>
struct SettingInt
{
UInt64 value;
IntType value;
bool changed = false;
SettingUInt64(UInt64 x = 0) : value(x) {}
SettingInt(IntType x = 0) : value(x) {}
operator UInt64() const { return value; }
SettingUInt64 & operator= (UInt64 x) { set(x); return *this; }
operator IntType() const { return value; }
SettingInt & operator= (IntType x) { set(x); return *this; }
String toString() const
{
return DB::toString(value);
}
void set(UInt64 x)
void set(IntType x)
{
value = x;
changed = true;
@ -57,28 +57,30 @@ struct SettingUInt64
void set(const Field & x)
{
set(safeGet<UInt64>(x));
set(safeGet<IntType>(x));
}
void set(const String & x)
{
set(parse<UInt64>(x));
set(parse<IntType>(x));
}
void set(ReadBuffer & buf)
{
UInt64 x = 0;
readVarUInt(x, buf);
IntType x = 0;
readVarT(x, buf);
set(x);
}
void write(WriteBuffer & buf) const
{
writeVarUInt(value, buf);
writeVarT(value, buf);
}
};
typedef SettingUInt64 SettingBool;
using SettingUInt64 = SettingInt<UInt64>;
using SettingInt64 = SettingInt<Int64>;
using SettingBool = SettingUInt64;
/** В отличие от SettingUInt64, поддерживает значение 'auto' - количество процессорных ядер без учёта SMT.

View File

@ -109,7 +109,7 @@ void Settings::loadSettingsFromConfig(const String & path, const Poco::Util::Abs
/// Если выставлен флаг check_readonly, в настройках выставлено readonly, но пришли какие-то изменения кинуть исключение.
void Settings::deserialize(ReadBuffer & buf)
{
bool readonly = limits.readonly == 1; /// Если readonly = 2, то можно менять настройки.
auto before_readonly = limits.readonly;
while (true)
{
@ -120,7 +120,8 @@ void Settings::deserialize(ReadBuffer & buf)
if (name.empty())
break;
if (!readonly)
/// Если readonly = 2, то можно менять настройки, кроме настройки readonly.
if (before_readonly == 0 || (before_readonly == 2 && name != "readonly"))
set(name, buf);
else
ignore(name, buf);

View File

@ -1,4 +1,5 @@
#include <iomanip>
#include <Poco/InflatingStream.h>
#include <Poco/Net/HTTPBasicCredentials.h>
@ -29,6 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int READONLY;
extern const int UNKNOWN_COMPRESSION_METHOD;
}
@ -38,7 +40,6 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
HTMLForm params(request);
std::istream & istr = request.stream();
bool readonly = request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET;
BlockInputStreamPtr query_plan;
@ -50,9 +51,34 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
if (!query_param.empty())
query_param += '\n';
/// Если указано compress, то будем сжимать результат.
used_output.out = new WriteBufferFromHTTPServerResponse(response);
/** Клиент может указать поддерживаемый метод сжатия (gzip или deflate) в HTTP-заголовке.
*/
String http_response_compression_methods = request.get("Accept-Encoding", "");
bool http_response_compress = false;
Poco::DeflatingStreamBuf::StreamType http_response_compression_method {};
if (!http_response_compression_methods.empty())
{
/// Мы поддерживаем gzip или deflate. Если клиент поддерживает оба, то предпочитается gzip.
/// NOTE Парсинг списка методов слегка некорректный.
if (std::string::npos != http_response_compression_methods.find("gzip"))
{
http_response_compress = true;
http_response_compression_method = Poco::DeflatingStreamBuf::STREAM_GZIP;
}
else if (std::string::npos != http_response_compression_methods.find("deflate"))
{
http_response_compress = true;
http_response_compression_method = Poco::DeflatingStreamBuf::STREAM_ZLIB;
}
}
used_output.out = new WriteBufferFromHTTPServerResponse(response, http_response_compress, http_response_compression_method);
/** Клиент может указать compress в query string.
* В этом случае, результат сжимается несовместимым алгоритмом для внутреннего использования и этот факт не отражается в HTTP заголовках.
*/
if (parse<bool>(params.get("compress", "0")))
used_output.out_maybe_compressed = new CompressedWriteBuffer(*used_output.out);
else
@ -80,10 +106,43 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
context.setCurrentQueryId(query_id);
SharedPtr<ReadBuffer> in_param = new ReadBufferFromString(query_param);
SharedPtr<ReadBuffer> in_post = new ReadBufferFromIStream(istr);
/// Данные POST-а могут быть сжаты алгоритмом, указанным в Content-Encoding заголовке.
String http_request_compression_method_str = request.get("Content-Encoding", "");
bool http_request_decompress = false;
Poco::InflatingStreamBuf::StreamType http_request_compression_method {};
if (!http_request_compression_method_str.empty())
{
if (http_request_compression_method_str == "gzip")
{
http_request_decompress = true;
http_request_compression_method = Poco::InflatingStreamBuf::STREAM_GZIP;
}
else if (http_request_compression_method_str == "deflate")
{
http_request_decompress = true;
http_request_compression_method = Poco::InflatingStreamBuf::STREAM_ZLIB;
}
else
throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str,
ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
std::experimental::optional<Poco::InflatingInputStream> decompressing_stream;
SharedPtr<ReadBuffer> in_post;
if (http_request_decompress)
{
decompressing_stream.emplace(istr, http_request_compression_method);
in_post = new ReadBufferFromIStream(decompressing_stream.value());
}
else
in_post = new ReadBufferFromIStream(istr);
/// Также данные могут быть сжаты несовместимым алгоритмом для внутреннего использования - это определяется параметром query_string.
SharedPtr<ReadBuffer> in_post_maybe_compressed;
/// Если указано decompress, то будем разжимать то, что передано POST-ом.
if (parse<bool>(params.get("decompress", "0")))
in_post_maybe_compressed = new CompressedReadBuffer(*in_post);
else
@ -91,6 +150,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
SharedPtr<ReadBuffer> in;
/// Поддержка "внешних данных для обработки запроса".
if (0 == strncmp(request.getContentType().data(), "multipart/form-data", strlen("multipart/form-data")))
{
in = in_param;
@ -98,7 +158,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
params.load(request, istr, handler);
/// Удаляем уже нененужные параметры из хранилища, чтобы впоследствии не перепутать их с натройками контекста и параметрами запроса.
/// Удаляем уже нененужные параметры из хранилища, чтобы впоследствии не перепутать их с наcтройками контекста и параметрами запроса.
for (const auto & it : handler.names)
{
params.erase(it + "_format");
@ -109,7 +169,29 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
else
in = new ConcatReadBuffer(*in_param, *in_post_maybe_compressed);
/// Настройки могут быть переопределены в запросе.
/** Настройки могут быть переопределены в запросе.
* Некоторые параметры (database, default_format, и все что использовались выше),
* не относятся к обычным настройкам (Settings).
*
* Среди настроек есть также readonly.
* readonly = 0 - можно выполнять любые запросы и изменять любые настройки
* readonly = 1 - можно выполнять только запросы на чтение, нельзя изменять настройки
* readonly = 2 - можно выполнять только запросы на чтение, можно изменять настройки кроме настройки readonly
*
* Заметим, что в запросе, если до этого readonly было равно 0,
* пользователь может изменить любые настройки и одновременно выставить readonly в другое значение.
*/
auto & limits = context.getSettingsRef().limits;
/// Если метод GET, то это эквивалентно настройке readonly, выставленной в ненулевое значение.
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
{
if (limits.readonly == 0)
limits.readonly = 2;
}
auto readonly_before_query = limits.readonly;
for (Poco::Net::NameValueCollection::ConstIterator it = params.begin(); it != params.end(); ++it)
{
if (it->first == "database")
@ -120,10 +202,6 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
{
context.setDefaultFormat(it->second);
}
else if (readonly && it->first == "readonly")
{
throw Exception("Setting 'readonly' cannot be overrided in readonly mode", ErrorCodes::READONLY);
}
else if (it->first == "query"
|| it->first == "compress"
|| it->first == "decompress"
@ -133,12 +211,22 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
|| it->first == "query_id")
{
}
else /// Все неизвестные параметры запроса рассматриваются, как настройки.
else
{
/// Все остальные параметры запроса рассматриваются, как настройки.
if (readonly_before_query == 1)
throw Exception("Cannot override setting (" + it->first + ") in readonly mode", ErrorCodes::READONLY);
if (readonly_before_query && it->first == "readonly")
throw Exception("Setting 'readonly' cannot be overrided in readonly mode", ErrorCodes::READONLY);
context.setSetting(it->first, it->second);
}
}
if (readonly)
context.getSettingsRef().limits.readonly = true;
if (http_response_compress)
used_output.out->setCompressionLevel(context.getSettingsRef().http_zlib_compression_level);
context.setInterface(Context::Interface::HTTP);

View File

@ -0,0 +1,47 @@
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9
< Content-Encoding: gzip
< Content-Encoding: deflate
< Content-Encoding: gzip
1
1
Hello, world
Hello, world

View File

@ -0,0 +1,18 @@
#!/bin/bash
curl -sS 'http://localhost:8123/' -d 'SELECT number FROM system.numbers LIMIT 10';
curl -sS 'http://localhost:8123/' -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' | gzip -d;
curl -sS 'http://localhost:8123/' -H 'Accept-Encoding: gzip, deflate' -d 'SELECT number FROM system.numbers LIMIT 10' | gzip -d;
curl -sS 'http://localhost:8123/' -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10';
curl -vsS 'http://localhost:8123/' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
curl -vsS 'http://localhost:8123/' -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
curl -vsS 'http://localhost:8123/' -H 'Accept-Encoding: deflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
curl -vsS 'http://localhost:8123/' -H 'Accept-Encoding: gzip, deflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
curl -vsS 'http://localhost:8123/' -H 'Accept-Encoding: zip, eflate' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
echo "SELECT 1" | curl -sS --data-binary @- 'http://localhost:8123/';
echo "SELECT 1" | gzip -c | curl -sS --data-binary @- -H 'Content-Encoding: gzip' 'http://localhost:8123/';
echo "'Hello, world'" | curl -sS --data-binary @- 'http://localhost:8123/?query=SELECT';
echo "'Hello, world'" | gzip -c | curl -sS --data-binary @- -H 'Content-Encoding: gzip' 'http://localhost:8123/?query=SELECT';

View File

@ -0,0 +1,2 @@
1 Hello
2 World

View File

@ -0,0 +1,3 @@
#!/bin/bash
echo -ne '1,Hello\n2,World\n' | curl -sSF 'file=@-' 'http://localhost:8123/?query=SELECT+*+FROM+file&file_format=CSV&file_types=UInt8,String';

View File

@ -0,0 +1,22 @@
name value changed
max_rows_to_read 10000 1
readonly 0 0
name value changed
max_rows_to_read 10000 1
readonly 2 1
name value changed
max_rows_to_read 10000 1
readonly 1 1
name value changed
max_rows_to_read 10000 1
readonly 2 1
Ok
Ok
0
0
Ok
Ok

View File

@ -0,0 +1,20 @@
#!/bin/bash
# При POST можно делать что угодно.
curl -sS "http://localhost:8123/?query=SELECT+*+FROM+system.settings+WHERE+name+IN+('readonly','max_rows_to_read')&max_rows_to_read=10000&default_format=PrettySpaceNoEscapes" -d' '
# При GET выставляется readonly = 2.
curl -sS "http://localhost:8123/?query=SELECT+*+FROM+system.settings+WHERE+name+IN+('readonly','max_rows_to_read')&max_rows_to_read=10000&default_format=PrettySpaceNoEscapes"
# Можно самому усилить readonly и при этом изменить какие-то ещё настройки.
curl -sS "http://localhost:8123/?query=SELECT+*+FROM+system.settings+WHERE+name+IN+('readonly','max_rows_to_read')&readonly=1&max_rows_to_read=10000&default_format=PrettySpaceNoEscapes" -d' '
curl -sS "http://localhost:8123/?query=SELECT+*+FROM+system.settings+WHERE+name+IN+('readonly','max_rows_to_read')&readonly=2&max_rows_to_read=10000&default_format=PrettySpaceNoEscapes" -d' '
curl -vsS "http://localhost:8123/?query=DROP+TABLE+IF+EXISTS+test.nonexistent" 2>&1 | grep -q '500 Internal Server Error' && echo 'Ok' || echo 'Fail'
curl -vsS "http://localhost:8123/?readonly=0&query=DROP+TABLE+IF+EXISTS+test.nonexistent" 2>&1 | grep -q '500 Internal Server Error' && echo 'Ok' || echo 'Fail'
curl -sS "http://localhost:8123/?query=DROP+TABLE+IF+EXISTS+test.nonexistent" -d ' ' | wc -l
curl -sS "http://localhost:8123/?readonly=0&query=DROP+TABLE+IF+EXISTS+test.nonexistent" -d ' ' | wc -l
curl -vsS "http://localhost:8123/?readonly=1&query=DROP+TABLE+IF+EXISTS+test.nonexistent" -d ' ' 2>&1 | grep -q '500 Internal Server Error' && echo 'Ok' || echo 'Fail'
curl -vsS "http://localhost:8123/?readonly=2&query=DROP+TABLE+IF+EXISTS+test.nonexistent" -d ' ' 2>&1 | grep -q '500 Internal Server Error' && echo 'Ok' || echo 'Fail'