Add functional test, fixed error. [#CLICKHOUSE-2070]

This commit is contained in:
Vitaliy Lyudvichenko 2017-03-02 12:35:17 +03:00 committed by alexey-milovidov
parent b5b863a6f4
commit 44c9bad289
8 changed files with 217 additions and 25 deletions

View File

@ -32,6 +32,8 @@ public:
return curr_buffer;
}
~CascadeWriteBuffer();
private:
WriteBuffer * setNextBuffer();

View File

@ -8,14 +8,13 @@
namespace DB
{
class ReadBufferFromMemoryWriteBuffer;
/// Stores data in memory chunks, size of cunks are exponentially increasing duting write
/// Data could be reread after write
/// Stores data in memory chunks, size of cunks are exponentially increasing during write
/// Written data could be reread after write
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer
{
public:
/// Use max_total_size_ = 0 for unlimited storage
MemoryWriteBuffer(
size_t max_total_size_ = 0,
size_t initial_chunk_size_ = DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -36,8 +36,7 @@ void CascadeWriteBuffer::nextImpl()
{
if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED)
{
/// actualize position of old buffer (it was reset by WriteBuffer::next)
curr_buffer->position() = position();
/// TODO: protocol should require set(position(), 0) before Exception
/// good situation, fetch next WriteBuffer
++curr_buffer_num;
@ -47,13 +46,17 @@ void CascadeWriteBuffer::nextImpl()
throw;
}
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
set(curr_buffer->position(), curr_buffer->buffer().end() - curr_buffer->position());
// std::cerr << "CascadeWriteBuffer a count=" << count() << " bytes=" << bytes << " offset=" << offset()
// << " bytes+size=" << bytes + buffer().size() << "\n";
}
void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
{
/// Sync position with underlying buffer before invalidating
curr_buffer->position() = position();
res = std::move(prepared_sources);
curr_buffer = nullptr;
@ -78,7 +81,20 @@ WriteBuffer * CascadeWriteBuffer::setNextBuffer()
if (!res)
throw Exception("Required WriteBuffer is not created", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
/// Check that returned buffer isn't empty
if (!res->hasPendingData())
res->next();
return res;
}
CascadeWriteBuffer::~CascadeWriteBuffer()
{
/// Sync position with underlying buffer before exit
if (curr_buffer)
curr_buffer->position() = position();
}
}

View File

@ -73,6 +73,7 @@ private:
Position end_pos;
};
MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_)
: WriteBuffer(nullptr, 0), max_total_size(max_total_size_), initial_chunk_size(initial_chunk_size_), growth_rate(growth_rate_)
{
@ -110,7 +111,10 @@ void MemoryWriteBuffer::addChunk()
next_chunk_size = max_total_size - total_chunks_size;
if (0 == next_chunk_size)
{
set(position(), 0);
throw Exception("MemoryWriteBuffer limit is exhausted", ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED);
}
}
Position begin = reinterpret_cast<Position>(Allocator<false>().alloc(next_chunk_size));
@ -118,6 +122,8 @@ void MemoryWriteBuffer::addChunk()
total_chunks_size += next_chunk_size;
set(chunk_tail->begin(), chunk_tail->size());
// std::cerr << "MemoryWriteBuffer a count=" << count() << " bytes=" << bytes << " bytes+size=" << bytes + buffer().size() << " total_chunks_size="
// << total_chunks_size << "\n";
}
std::shared_ptr<ReadBuffer> MemoryWriteBuffer::getReadBufferImpl()

View File

@ -1,15 +1,21 @@
#include <gtest/gtest.h>
#include <stdexcept>
#include <Poco/File.h>
#include <DB/IO/CascadeWriteBuffer.h>
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/WriteBufferFromTemporaryFile.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/copyData.h>
#include <DB/Common/typeid_cast.h>
using namespace DB;
@ -21,7 +27,6 @@ static std::string makeTestArray(size_t size)
return res;
}
static void testCascadeBufferRedability(
std::string data,
CascadeWriteBuffer::WriteBufferPtrs && arg1,
@ -124,6 +129,51 @@ catch (...)
}
static void checkHTTPHandlerCase(size_t input_size, size_t memory_buffer_size)
{
std::string src = makeTestArray(input_size);
std::string res_str(DBMS_DEFAULT_BUFFER_SIZE, '\0');
{
auto res_buf = std::make_shared<WriteBufferFromString>(res_str);
CascadeWriteBuffer cascade(
{
std::make_shared<MemoryWriteBuffer>(memory_buffer_size)
},
{
[res_buf] (const WriteBufferPtr & prev_buf)
{
auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
auto rdbuf = prev_memory_buffer->tryGetReadBuffer();
copyData(*rdbuf , *res_buf);
return res_buf;
}
});
cascade.write(&src[0], src.size());
EXPECT_EQ(cascade.count(), src.size());
}
ASSERT_EQ(src.size(), res_str.size());
ASSERT_TRUE(src == res_str);
}
TEST(CascadeWriteBuffer, HTTPHandlerCase)
{
std::vector<size_t> sizes{1, 500000, DBMS_DEFAULT_BUFFER_SIZE, 1000000, 1451424, 1500000, 2000000, 2500000};
for (size_t input_size : sizes)
{
for (size_t memory_buffer_size : sizes)
{
if (input_size > memory_buffer_size)
checkHTTPHandlerCase(input_size, memory_buffer_size);
}
}
}
static void checkMemoryWriteBuffer(std::string data, MemoryWriteBuffer && buf)
{
buf.write(&data[0], data.size());
@ -160,6 +210,8 @@ TEST(MemoryWriteBuffer, WriteAndReread)
EXPECT_THROW(buf.write(&data[0], data.size()), DB::Exception);
}
}
checkMemoryWriteBuffer(makeTestArray(1451424), MemoryWriteBuffer(1451424));
}

View File

@ -131,8 +131,8 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
cascade_buffer->getResultBuffers(write_buffers);
if (write_buffers.size() != 2)
throw Exception("Exactly 2 buffers are required to overwrite result into HTTP response", ErrorCodes::LOGICAL_ERROR);
if (write_buffers.empty())
throw Exception("At least one buffer is expected to overwrite result into HTTP response", ErrorCodes::LOGICAL_ERROR);
for (auto & write_buf : write_buffers)
{
@ -225,24 +225,33 @@ void HTTPHandler::processQuery(
/// compressed using internal algorithm. This is not reflected in HTTP headers.
bool internal_compression = params.getParsed<bool>("compress", false);
size_t response_buffer_size = params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE);
response_buffer_size = response_buffer_size ? response_buffer_size : DBMS_DEFAULT_BUFFER_SIZE;
/// At least, we should postpone sending of first buffer_size result bytes
size_t buffer_size_total = std::max(
params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
bool result_buffer_overflow_to_disk = params.get("result_buffer_on_overflow", "http") == "disk";
/// If it is specified, the whole result will be buffered.
/// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
bool buffer_until_eof = params.getParsed<bool>("wait_end_of_query", false);
size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE;
size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0;
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
response, client_supports_http_compression, http_response_compression_method, response_buffer_size);
response, client_supports_http_compression, http_response_compression_method, buffer_size_http);
if (internal_compression)
used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out);
else
used_output.out_maybe_compressed = used_output.out;
if (response_buffer_size > 0)
if (buffer_size_memory > 0 || buffer_until_eof)
{
CascadeWriteBuffer::WriteBufferPtrs concat_buffers1{ std::make_shared<MemoryWriteBuffer>(response_buffer_size) };
CascadeWriteBuffer::WriteBufferConstructors concat_buffers2{};
CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1;
CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2;
if (result_buffer_overflow_to_disk)
if (buffer_size_memory > 0)
cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
if (buffer_until_eof)
{
std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/" + escapeForFileName(user) + ".XXXXXX";
@ -251,26 +260,27 @@ void HTTPHandler::processQuery(
return WriteBufferFromTemporaryFile::create(tmp_path_template);
};
concat_buffers2.emplace_back(std::move(create_tmp_disk_buffer));
cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));
}
else
{
auto rewrite_memory_buffer_and_forward = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf)
auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf)
{
auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
if (!prev_memory_buffer)
throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR);
copyData(*prev_memory_buffer->tryGetReadBuffer(), *next_buffer);
auto rdbuf = prev_memory_buffer->tryGetReadBuffer();
copyData(*rdbuf , *next_buffer);
return next_buffer;
};
concat_buffers2.emplace_back(rewrite_memory_buffer_and_forward);
cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
}
used_output.out_maybe_delayed_and_compressed = std::make_shared<CascadeWriteBuffer>(
std::move(concat_buffers1), std::move(concat_buffers2));
std::move(cascade_buffer1), std::move(cascade_buffer2));
}
else
{
@ -307,7 +317,7 @@ void HTTPHandler::processQuery(
/// 'decompress' query parameter.
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
bool in_post_compressed = false;
if (parse<bool>(params.get("decompress", "0")))
if (params.getParsed<bool>("decompress", false))
{
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
in_post_compressed = true;
@ -360,7 +370,7 @@ void HTTPHandler::processQuery(
auto readonly_before_query = limits.readonly;
NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "result_buffer_size", "result_buffer_on_overflow"
"buffer_size", "wait_end_of_query"
};
for (auto it = params.begin(); it != params.end(); ++it)
@ -435,7 +445,10 @@ void HTTPHandler::processQuery(
[&response] (const String & content_type) { response.setContentType(content_type); });
if (used_output.hasDelayed())
{
/// TODO: set Content-Length if possible (?)
pushDelayedResults(used_output);
}
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
/// the client.

View File

@ -0,0 +1,104 @@
#!/bin/bash
set -e
max_block_size=10
URL='http://localhost:8123/'
function query {
echo "SELECT toUInt8(intHash64(number)) FROM system.numbers LIMIT $1 FORMAT RowBinary"
#echo "SELECT toUInt8(number) FROM system.numbers LIMIT $1 FORMAT RowBinary"
}
function ch_url() {
curl -sS "$URL?max_block_size=$max_block_size&$1" -d "`query $2`"
}
# Check correct exceptions handling
exception_pattern="Code: 307, e\.displayText() = DB::Exception:[[:print:]]* e\.what() = DB::Exception$"
function check_only_exception() {
local res=`ch_url "$1" "$2"`
#(echo "$res")
#(echo "$res" | wc -l)
#(echo "$res" | grep -c "^$exception_pattern")
[[ `echo "$res" | wc -l` -eq 1 ]] && echo OK || echo FAIL
[[ $(echo "$res" | grep -c "^$exception_pattern") -eq 1 ]] && echo OK || echo FAIL
}
function check_last_line_exception() {
local res=`ch_url "$1" "$2"`
echo "$res" > res
#echo "$res" | wc -c
#echo "$res" | tail -n -2
[[ $(echo "$res" | tail -n -1 | grep -c "$exception_pattern") -eq 1 ]] && echo OK || echo FAIL
[[ $(echo "$res" | head -n -1 | grep -c "$exception_pattern") -eq 0 ]] && echo OK || echo FAIL
}
function check_exception_handling() {
check_only_exception "max_result_bytes=1000" 1001
check_only_exception "max_result_bytes=1000&wait_end_of_query=1" 1001
echo
check_only_exception "max_result_bytes=1048576&buffer_size=1048576&wait_end_of_query=0" 1048577
check_only_exception "max_result_bytes=1048576&buffer_size=1048576&wait_end_of_query=1" 1048577
echo
check_only_exception "max_result_bytes=1500000&buffer_size=2500000&wait_end_of_query=0" 1500001
check_only_exception "max_result_bytes=1500000&buffer_size=1500000&wait_end_of_query=1" 1500001
echo
check_only_exception "max_result_bytes=4000000&buffer_size=2000000&wait_end_of_query=1" 5000000
check_only_exception "max_result_bytes=4000000&wait_end_of_query=1" 5000000
check_last_line_exception "max_result_bytes=4000000&buffer_size=2000000&wait_end_of_query=0" 5000000
}
#check_exception_handling
max_block_size=500000
corner_sizes="1048576 `seq 500000 1000000 3500000`"
# Check HTTP results with clickhouse-client in normal case
function cmp_cli_and_http() {
clickhouse-client -q "`query $1`" > res1
ch_url "buffer_size=$2&wait_end_of_query=0" "$1" > res2
ch_url "buffer_size=$2&wait_end_of_query=1" "$1" > res3
cmp res1 res2
cmp res1 res3
rm -rf res1 res2 res3
}
function check_cli_and_http() {
for input_size in $corner_sizes; do
for buffer_size in $corner_sizes; do
#echo "$input_size" "$buffer_size"
cmp_cli_and_http "$input_size" "$buffer_size"
done
done
}
check_cli_and_http
# Check HTTP internal compression in normal case (clickhouse-compressor required)
function cmp_http_compression() {
clickhouse-client -q "`query $1`" > res0
ch_url 'compress=1' $1 | clickhouse-compressor --decompress > res1
ch_url "compress=1&buffer_size=$2&wait_end_of_query=0" $1 | clickhouse-compressor --decompress > res2
ch_url "compress=1&buffer_size=$2&wait_end_of_query=1" $1 | clickhouse-compressor --decompress > res3
cmp res0 res1
cmp res1 res2
cmp res1 res3
rm -rf res0 res1 res2 res3
}
function check_http_compression() {
for input_size in $corner_sizes; do
for buffer_size in $corner_sizes; do
#echo "$input_size" "$buffer_size"
cmp_http_compression "$input_size" "$buffer_size"
done
done
}
has_compressor=$(command -v clickhouse-compressor &>/dev/null && echo 1)
[[ has_compressor ]] && check_http_compression || true