Implemented re-readable buffers, add unit tests. [#CLICKHOUSE-2070]

This commit is contained in:
Vitaliy Lyudvichenko 2017-02-09 13:10:13 +03:00 committed by alexey-milovidov
parent 3e48021f81
commit 19a2195710
17 changed files with 844 additions and 57 deletions

View File

@ -45,3 +45,10 @@ endif ()
if (NOT AARCH64)
add_subdirectory (libcpuid)
endif ()
if (ENABLE_TESTS)
add_subdirectory (googletest)
# avoid problems with <regexp.h>
target_compile_definitions (gtest INTERFACE -DGTEST_HAS_POSIX_RE=0)
target_include_directories (gtest INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/googletest/include)
endif()

View File

@ -33,6 +33,7 @@ endif ()
find_package (Threads)
add_subdirectory (src)
add_subdirectory (tests/unit_tests)
add_library(string_utils
include/DB/Common/StringUtils.h

View File

@ -5,6 +5,8 @@
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <DB/IO/ReadHelpers.h>
/** Somehow, in case of POST, Poco::Net::HTMLForm doesn't read parameters from URL, only from body.
* This helper allows to read parameters just from URL.
@ -23,4 +25,18 @@ struct HTMLForm : public Poco::Net::HTMLForm
std::istringstream istr(uri.getRawQuery());
readUrl(istr);
}
template <typename T>
T getParsed(const std::string & key, T default_value)
{
auto it = find(key);
return (it != end()) ? DB::parse<T>(it->second) : default_value;
}
template <typename T>
T getParsed(const std::string & key)
{
return DB::parse<T>(get(key));
}
};

View File

@ -0,0 +1,125 @@
#include <functional>
#include <forward_list>
#include <iostream>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/Core/Defines.h>
#include <DB/Common/Allocator.h>
#include <DB/Common/Exception.h>
#include <Poco/File.h>
#include <Poco/Path.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
class CascadeWriteBuffer : public WriteBuffer
{
public:
using WriteBufferPtrs = std::vector<WriteBufferPtr>;
using WriteBufferConstructor = std::function<WriteBufferPtr (const WriteBufferPtr & prev_buf)>;
using WriteBufferConstructors = std::vector<WriteBufferConstructor>;
CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_ = {});
void nextImpl() override;
/// Should be called once
void getResultBuffers(WriteBufferPtrs & res);
private:
WriteBuffer * getNextBuffer();
WriteBufferPtrs prepared_sources;
WriteBufferConstructors lazy_sources;
size_t first_lazy_source_num;
size_t num_sources;
WriteBuffer * curr_buffer;
size_t curr_buffer_num;
};
class ReadBufferFromMemoryWriteBuffer;
class ReadBufferFromTemporaryWriteBuffer;
struct IReadableWriteBuffer
{
/// Creates read buffer from current write buffer
/// Returned buffer points to the first byte of original buffer and finds with current position.
/// Original stream becomes invalid.
virtual std::shared_ptr<ReadBuffer> getReadBuffer() = 0;
virtual ~IReadableWriteBuffer() {}
};
/// Allow to write large data into memory
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer
{
public:
MemoryWriteBuffer(
size_t max_total_size_ = 0,
size_t initial_chunk_size_ = DBMS_DEFAULT_BUFFER_SIZE,
double growth_rate_ = 2.0);
void nextImpl() override;
std::shared_ptr<ReadBuffer> getReadBuffer() override;
~MemoryWriteBuffer() override;
protected:
const size_t max_total_size;
const size_t initial_chunk_size;
const double growth_rate;
using Container = std::forward_list<BufferBase::Buffer>;
Container chunk_list;
Container::iterator chunk_tail;
size_t total_chunks_size = 0;
void addChunk();
friend class ReadBufferFromMemoryWriteBuffer;
};
/// Rereadable WriteBuffer, could be used as disk buffer
/// Creates unique temporary in directory (and directory itself)
class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadableWriteBuffer
{
public:
using Ptr = std::shared_ptr<WriteBufferFromTemporaryFile>;
/// path_template examle "/opt/clickhouse/tmp/data.XXXXXX"
static Ptr create(const std::string & path_template_);
std::shared_ptr<ReadBuffer> getReadBuffer() override;
~WriteBufferFromTemporaryFile() override;
protected:
WriteBufferFromTemporaryFile(int fd, const std::string & tmp_path)
: WriteBufferFromFile(fd, tmp_path)
{}
};
}

View File

@ -2,6 +2,7 @@
#include <cstring>
#include <algorithm>
#include <memory>
#include <DB/Common/Exception.h>
#include <DB/IO/BufferBase.h>
@ -162,4 +163,7 @@ private:
};
using ReadBufferPtr = std::shared_ptr<ReadBuffer>;
}

View File

@ -20,7 +20,7 @@ namespace DB
*/
class ReadBufferFromFile : public ReadBufferFromFileDescriptor
{
private:
protected:
std::string file_name;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
@ -29,7 +29,7 @@ public:
char * existing_memory = nullptr, size_t alignment = 0);
/// Use pre-opened file descriptor.
ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0);
~ReadBufferFromFile() override;

View File

@ -2,6 +2,8 @@
#include <algorithm>
#include <cstring>
#include <memory>
#include <iostream>
#include <DB/Common/Exception.h>
#include <DB/IO/BufferBase.h>
@ -96,4 +98,7 @@ private:
};
using WriteBufferPtr = std::shared_ptr<WriteBuffer>;
}

View File

@ -24,7 +24,7 @@ namespace DB
*/
class WriteBufferFromFile : public WriteBufferFromFileDescriptor
{
private:
protected:
std::string file_name;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite};
@ -40,9 +40,8 @@ public:
/// Use pre-opened file descriptor.
WriteBufferFromFile(
int fd,
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags = -1,
mode_t mode = 0666,
char * existing_memory = nullptr,
size_t alignment = 0);

View File

@ -259,6 +259,11 @@ struct Settings
*/ \
M(SettingUInt64, input_format_allow_errors_num, 0) \
M(SettingFloat, input_format_allow_errors_ratio, 0) \
\
/** Size of buffer with query results */ \
M(SettingUInt64, send_buffer_size, DBMS_DEFAULT_BUFFER_SIZE) \
M(SettingUInt64, http_response_memory_buffer_size, 0) \
M(SettingString, http_response_memory_buffer_overflow, "") \
/// Possible limits for query execution.
Limits limits;

View File

@ -364,6 +364,8 @@ namespace ErrorCodes
extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 359;
extern const int CANNOT_CREATE_CHARSET_CONVERTER = 360;
extern const int SEEK_POSITION_OUT_OF_BOUND = 361;
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED = 362;
extern const int CANNOT_CREATE_IO_BUFFER = 363;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,308 @@
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/Common/Exception.h>
#include <common/likely.h>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int LOGICAL_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
extern const int CANNOT_CREATE_IO_BUFFER;
}
inline std::ostream & operator << (std::ostream & stream, BufferBase & buffer)
{
stream
<< " begin=" << reinterpret_cast<void*>(buffer.buffer().begin())
<< " pos=" << buffer.position() - buffer.buffer().begin()
<< " size=" << buffer.buffer().size()
<< " int_size=" << buffer.internalBuffer().size() << "\n";
return stream;
}
CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_)
: WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_))
{
first_lazy_source_num = prepared_sources.size();
num_sources = first_lazy_source_num + lazy_sources.size();
/// fill lazy sources by nullptr
prepared_sources.resize(num_sources);
curr_buffer_num = 0;
curr_buffer = getNextBuffer();
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}
void CascadeWriteBuffer::nextImpl()
{
try
{
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const Exception & e)
{
if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED)
{
/// actualize position of old buffer (it was reset by WriteBuffer::next)
curr_buffer->position() = position();
/// good situation, fetch next WriteBuffer
++curr_buffer_num;
curr_buffer = getNextBuffer();
}
else
throw;
}
set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}
void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
{
curr_buffer->position() = position();
res = std::move(prepared_sources);
curr_buffer = nullptr;
curr_buffer_num = num_sources = 0;
}
WriteBuffer * CascadeWriteBuffer::getNextBuffer()
{
if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)
{
if (!prepared_sources[curr_buffer_num])
{
WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf);
}
}
else if (curr_buffer_num >= num_sources)
throw Exception("There are no WriteBuffers to write result", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
WriteBuffer * res = prepared_sources[curr_buffer_num].get();
if (!res)
throw Exception("Required WriteBuffer is not created", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
return res;
}
class ReadBufferFromMemoryWriteBuffer : public ReadBuffer
{
public:
ReadBufferFromMemoryWriteBuffer(MemoryWriteBuffer && origin)
:
ReadBuffer(nullptr, 0),
chunk_list(std::move(origin.chunk_list)),
end_pos(origin.position())
{
chunk_head = chunk_list.begin();
setChunk();
}
bool nextImpl() override
{
if (chunk_head == chunk_list.end())
return false;
++chunk_head;
return setChunk();
}
~ReadBufferFromMemoryWriteBuffer()
{
for (const auto & range : chunk_list)
Allocator<false>().free(range.begin(), range.size());
}
private:
/// update buffers and position according to chunk_head pointer
bool setChunk()
{
if (chunk_head != chunk_list.end())
{
internalBuffer() = *chunk_head;
auto next_chunk = chunk_head;
++next_chunk;
/// It is last chunk, it should be truncated
if (next_chunk != chunk_list.end())
buffer() = internalBuffer();
else
buffer() = Buffer(internalBuffer().begin(), end_pos);
position() = buffer().begin();
}
else
{
buffer() = internalBuffer() = Buffer(nullptr, nullptr);
position() = nullptr;
}
return buffer().size() != 0;
}
using Container = std::forward_list<BufferBase::Buffer>;
Container chunk_list;
Container::iterator chunk_head;
Position end_pos;
};
MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_)
: WriteBuffer(nullptr, 0), max_total_size(max_total_size_), initial_chunk_size(initial_chunk_size_), growth_rate(growth_rate_)
{
addChunk();
}
void MemoryWriteBuffer::nextImpl()
{
if (unlikely(hasPendingData()))
{
/// ignore flush
buffer() = Buffer(pos, buffer().end());
return;
}
addChunk();
}
void MemoryWriteBuffer::addChunk()
{
size_t next_chunk_size;
if (chunk_list.empty())
{
chunk_tail = chunk_list.before_begin();
next_chunk_size = initial_chunk_size;
}
else
{
next_chunk_size = std::max(1ul, static_cast<size_t>(chunk_tail->size() * growth_rate));
}
if (max_total_size)
{
if (total_chunks_size + next_chunk_size > max_total_size)
next_chunk_size = max_total_size - total_chunks_size;
if (0 == next_chunk_size)
throw Exception("MemoryWriteBuffer limit is exhausted", ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED);
}
Position begin = reinterpret_cast<Position>(Allocator<false>().alloc(next_chunk_size));
chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size);
total_chunks_size += next_chunk_size;
set(chunk_tail->begin(), chunk_tail->size());
}
std::shared_ptr<ReadBuffer> MemoryWriteBuffer::getReadBuffer()
{
auto res = std::make_shared<ReadBufferFromMemoryWriteBuffer>(std::move(*this));
/// invalidate members
chunk_list.clear();
chunk_tail = chunk_list.begin();
return res;
}
MemoryWriteBuffer::~MemoryWriteBuffer()
{
for (const auto & range : chunk_list)
Allocator<false>().free(range.begin(), range.size());
}
WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & path_template_)
{
std::string path_template = path_template_;
if (path_template.empty() || path_template.back() != 'X')
path_template += "XXXXXX";
Poco::File(Poco::Path(path_template).makeParent()).createDirectories();
int fd = mkstemp(const_cast<char *>(path_template.c_str()));
if (fd < 0)
throw Exception("Cannot create temporary file " + path_template, ErrorCodes::CANNOT_OPEN_FILE);
return Ptr(new WriteBufferFromTemporaryFile(fd, path_template));
}
WriteBufferFromTemporaryFile::~WriteBufferFromTemporaryFile()
{
/// remove temporary file if it was not passed to ReadBuffer
if (getFD() >= 0 && !getFileName().empty())
{
Poco::File(getFileName()).remove();
}
}
class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile
{
public:
static ReadBufferPtr createFrom(WriteBufferFromTemporaryFile * origin)
{
int fd = origin->getFD();
std::string file_name = origin->getFileName();
off_t res = lseek(fd, 0, SEEK_SET);
if (-1 == res)
throwFromErrno("Cannot reread temporary file " + file_name, ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name);
}
ReadBufferFromTemporaryWriteBuffer(int fd, const std::string & file_name)
: ReadBufferFromFile(fd, file_name)
{}
~ReadBufferFromTemporaryWriteBuffer() override
{
/// remove temporary file
Poco::File(file_name).remove();
}
};
ReadBufferPtr WriteBufferFromTemporaryFile::getReadBuffer()
{
/// ignore buffer, write all data to file and reread it from disk
sync();
auto res = ReadBufferFromTemporaryWriteBuffer::createFrom(this);
/// invalidate FD to avoid close(fd) in destructor
setFD(-1);
file_name = {};
return res;
}
}

View File

@ -61,11 +61,14 @@ ReadBufferFromFile::ReadBufferFromFile(
ReadBufferFromFile::ReadBufferFromFile(
int fd,
const std::string & original_file_name,
size_t buf_size,
int flags,
char * existing_memory,
size_t alignment)
: ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
:
ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd) + ")" : original_file_name)
{
}

View File

@ -66,12 +66,13 @@ WriteBufferFromFile::WriteBufferFromFile(
/// Use pre-opened file descriptor.
WriteBufferFromFile::WriteBufferFromFile(
int fd,
const std::string & original_file_name,
size_t buf_size,
int flags,
mode_t mode,
char * existing_memory,
size_t alignment)
: WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")")
:
WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd) + ")" : original_file_name)
{
}

View File

@ -0,0 +1,193 @@
#include <gtest/gtest.h>
#include "/home/vludv/ClickHouse/contrib/googletest/include/gtest/gtest.h"
#include <memory.h>
#include <Poco/File.h>
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/copyData.h>
using namespace DB;
static std::string makeTestArray(size_t size)
{
std::string res(size, '\0');
for (size_t i = 0; i < res.size(); ++i)
res[i] = i % 256;
return res;
}
static void testCascadeBufferRedability(
std::string data,
CascadeWriteBuffer::WriteBufferPtrs && arg1,
CascadeWriteBuffer::WriteBufferConstructors && arg2)
{
CascadeWriteBuffer cascade{std::move(arg1), std::move(arg2)};
cascade.write(&data[0], data.size());
EXPECT_EQ(cascade.count(), data.size());
std::vector<WriteBufferPtr> write_buffers;
std::vector<ReadBufferPtr> read_buffers;
std::vector<ReadBuffer *> read_buffers_raw;
cascade.getResultBuffers(write_buffers);
for (WriteBufferPtr & wbuf : write_buffers)
{
if (!wbuf)
continue;
auto wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(wbuf.get());
ASSERT_FALSE(!wbuf_readable);
auto rbuf = wbuf_readable->getReadBuffer();
ASSERT_FALSE(!rbuf);
read_buffers.emplace_back(rbuf);
read_buffers_raw.emplace_back(rbuf.get());
}
ConcatReadBuffer concat(read_buffers_raw);
std::string decoded_data;
{
WriteBufferFromString decoded_data_writer(decoded_data);
copyData(concat, decoded_data_writer);
}
ASSERT_EQ(data, decoded_data);
}
TEST(CascadeWriteBuffer, RereadWithTwoMemoryBuffers)
try
{
size_t max_s = 32;
for (size_t s = 0; s < max_s; ++s)
{
testCascadeBufferRedability(makeTestArray(s),
{
std::make_shared<MemoryWriteBuffer>(s/2, 1, 2.0),
std::make_shared<MemoryWriteBuffer>(s - s/2, 1, 2.0)
},
{}
);
testCascadeBufferRedability(makeTestArray(s),
{
std::make_shared<MemoryWriteBuffer>(s, 2, 1.5),
},
{}
);
testCascadeBufferRedability(makeTestArray(s),
{
std::make_shared<MemoryWriteBuffer>(0, 1, 1.0),
},
{}
);
testCascadeBufferRedability(makeTestArray(s),
{
std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/2), std::max(2ul, s/4), 0.5),
std::make_shared<MemoryWriteBuffer>(0, 4, 1.0),
},
{}
);
testCascadeBufferRedability(makeTestArray(max_s),
{
std::make_shared<MemoryWriteBuffer>(s, 1, 2.0)
},
{
[=] (auto prev) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); }
}
);
testCascadeBufferRedability(makeTestArray(max_s),
{},
{
[=] (auto prev) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); },
[=] (auto prev) { return std::make_shared<MemoryWriteBuffer>(s, 1, 2.0); }
}
);
}
}
catch (const DB::Exception & e)
{
std::cerr << getCurrentExceptionMessage(true) << "\n";
throw;
}
catch (...)
{
throw;
}
TEST(TemporaryFileWriteBuffer, WriteAndReread)
try
{
for (size_t s = 0; s < 2500000; s += 500000)
{
std::string tmp_template = "tmp/TemporaryFileWriteBuffer.XXXXXX";
std::string data = makeTestArray(s);
auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
buf->write(&data[0], data.size());
std::string tmp_filename = buf->getFileName();
ASSERT_EQ(tmp_template.size(), tmp_filename.size());
auto reread_buf = buf->getReadBuffer();
std::string decoded_data;
{
WriteBufferFromString wbuf_decode(decoded_data);
copyData(*reread_buf, wbuf_decode);
}
ASSERT_EQ(data.size(), decoded_data.size());
ASSERT_TRUE(data == decoded_data);
buf.reset();
reread_buf.reset();
ASSERT_TRUE(!Poco::File(tmp_filename).exists());
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << "\n";
throw;
}
TEST(CascadeWriteBuffer, RereadWithTemporaryFileWriteBuffer)
try
{
const std::string tmp_template = "tmp/RereadWithTemporaryFileWriteBuffer.XXXXXX";
for (size_t s = 0; s < 4000000; s += 1000000)
{
testCascadeBufferRedability(makeTestArray(s),
{},
{
[=] (auto prev) { return WriteBufferFromTemporaryFile::create(tmp_template); }
}
);
testCascadeBufferRedability(makeTestArray(s),
{
std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/3ul), 2, 1.5),
},
{
[=] (auto prev) { return WriteBufferFromTemporaryFile::create(tmp_template); }
}
);
}
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << "\n";
throw;
}

View File

@ -1,9 +1,11 @@
#include <iomanip>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/File.h>
#include <DB/Common/ExternalTable.h>
#include <DB/Common/StringUtils.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/ZlibInflatingReadBuffer.h>
@ -13,7 +15,11 @@
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/IO/MemoryReadWriteBuffer.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/copyData.h>
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
@ -36,6 +42,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_DATE;
extern const int CANNOT_PARSE_DATETIME;
extern const int CANNOT_PARSE_NUMBER;
extern const int CANNOT_OPEN_FILE;
extern const int UNKNOWN_ELEMENT_IN_AST;
extern const int UNKNOWN_TYPE_OF_AST_NODE;
@ -103,7 +110,8 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE;
else if (exception_code == ErrorCodes::NOT_IMPLEMENTED)
return HTTPResponse::HTTP_NOT_IMPLEMENTED;
else if (exception_code == ErrorCodes::SOCKET_TIMEOUT)
else if (exception_code == ErrorCodes::SOCKET_TIMEOUT ||
exception_code == ErrorCodes::CANNOT_OPEN_FILE)
return HTTPResponse::HTTP_SERVICE_UNAVAILABLE;
return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR;
@ -133,37 +141,6 @@ void HTTPHandler::processQuery(
query_param += '\n';
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
bool client_supports_http_compression = false;
ZlibCompressionMethod http_response_compression_method {};
if (!http_response_compression_methods.empty())
{
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("gzip"))
{
client_supports_http_compression = true;
http_response_compression_method = ZlibCompressionMethod::Gzip;
}
else if (std::string::npos != http_response_compression_methods.find("deflate"))
{
client_supports_http_compression = true;
http_response_compression_method = ZlibCompressionMethod::Zlib;
}
}
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
response, client_supports_http_compression, http_response_compression_method);
/// Client can pass a 'compress' flag in the query string. In this case the query result is
/// compressed using internal algorithm. This is not reflected in HTTP headers.
if (parse<bool>(params.get("compress", "0")))
used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out);
else
used_output.out_maybe_compressed = used_output.out;
/// User name and password can be passed using query parameters or using HTTP Basic auth (both methods are insecure).
/// The user and password can be passed by headers (similar to X-Auth-*), which is used by load balancers to pass authentication information
std::string user = request.get("X-ClickHouse-User", params.get("user", "default"));
@ -186,6 +163,96 @@ void HTTPHandler::processQuery(
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
bool client_supports_http_compression = false;
ZlibCompressionMethod http_response_compression_method {};
if (!http_response_compression_methods.empty())
{
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("gzip"))
{
client_supports_http_compression = true;
http_response_compression_method = ZlibCompressionMethod::Gzip;
}
else if (std::string::npos != http_response_compression_methods.find("deflate"))
{
client_supports_http_compression = true;
http_response_compression_method = ZlibCompressionMethod::Zlib;
}
}
/// Client can pass a 'compress' flag in the query string. In this case the query result is
/// compressed using internal algorithm. This is not reflected in HTTP headers.
bool internal_compression = params.getParsed<bool>("compress", false);
size_t response_buffer_size = params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE);
response_buffer_size = response_buffer_size ? response_buffer_size : DBMS_DEFAULT_BUFFER_SIZE;
auto response_raw = std::make_shared<WriteBufferFromHTTPServerResponse>(
response, client_supports_http_compression, http_response_compression_method, response_buffer_size);
WriteBufferPtr response_maybe_compressed;
size_t result_buffer_memory_size = params.getParsed<size_t>("result_buffer_size", 0);
bool use_memory_buffer = result_buffer_memory_size;
bool result_buffer_overflow_to_disk = params.get("result_buffer_on_overflow", "http") == "disk";
if (use_memory_buffer)
{
ConcatWriteBuffer::WriteBufferPtrs concat_buffers1{ std::make_shared<MemoryWriteBuffer>(result_buffer_memory_size) };
ConcatWriteBuffer::WriteBufferConstructors concat_buffers2{};
if (result_buffer_overflow_to_disk)
{
std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/" + escapeForFileName(user) + ".XXXXXX";
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
{
return WriteBufferFromTemporaryFile::create(tmp_path_template);
};
concat_buffers2.emplace_back(std::move(create_tmp_disk_buffer));
}
else
{
auto rewrite_memory_buffer_to_http_and_continue = [response_raw] (const WriteBufferPtr & prev_buf)
{
auto memory_write_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
if (!memory_write_buffer)
throw Exception("Memory buffer was not allocated", ErrorCodes::LOGICAL_ERROR);
auto memory_read_buffer = memory_write_buffer->getReadBuffer();
copyData(*memory_read_buffer, *response_raw);
return response_raw;
};
concat_buffers2.emplace_back(rewrite_memory_buffer_to_http_and_continue);
}
used_output.out = response_raw;
used_output.delayed_out_raw = std::make_shared<ConcatWriteBuffer>(std::move(concat_buffers1), std::move(concat_buffers2));
if (internal_compression)
used_output.delayed_out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.delayed_out_raw);
else
used_output.delayed_out_maybe_compressed = used_output.delayed_out_raw;
used_output.out_maybe_compressed = used_output.delayed_out_maybe_compressed;
}
else
{
used_output.out = response_raw;
if (internal_compression)
used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*response_raw);
else
used_output.out_maybe_compressed = response_raw;
}
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param);
std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr);
@ -268,7 +335,11 @@ void HTTPHandler::processQuery(
auto readonly_before_query = limits.readonly;
for (Poco::Net::NameValueCollection::ConstIterator it = params.begin(); it != params.end(); ++it)
NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "result_buffer_size", "result_buffer_on_overflow"
};
for (auto it = params.begin(); it != params.end(); ++it)
{
if (it->first == "database")
{
@ -278,14 +349,7 @@ void HTTPHandler::processQuery(
{
context.setDefaultFormat(it->second);
}
else if (it->first == "query"
|| it->first == "compress"
|| it->first == "decompress"
|| it->first == "user"
|| it->first == "password"
|| it->first == "quota_key"
|| it->first == "query_id"
|| it->first == "stacktrace")
else if (reserved_param_names.find(it->first) != reserved_param_names.end())
{
}
else
@ -346,6 +410,46 @@ void HTTPHandler::processQuery(
executeQuery(*in, *used_output.out_maybe_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); });
if (use_memory_buffer)
{
std::vector<WriteBufferPtr> write_buffers;
used_output.delayed_out_raw->getResultBuffers(write_buffers);
std::vector<ReadBufferPtr> read_buffers;
ConcatReadBuffer::ReadBuffers read_buffers_raw_ptr;
for (auto & write_buf : write_buffers)
{
IReadableWriteBuffer * write_buf_concrete;
if (write_buf && (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get())))
{
read_buffers.emplace_back(write_buf_concrete->getReadBuffer());
read_buffers_raw_ptr.emplace_back(read_buffers.back().get());
}
}
if (result_buffer_overflow_to_disk)
{
/// All results in concat buffer
if (read_buffers_raw_ptr.empty())
throw Exception("There are no buffers to overwrite result into response", ErrorCodes::LOGICAL_ERROR);
ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr);
copyData(concat_read_buffer, *used_output.out);
}
else
{
/// Results could be either in the first buffer, or they could be already pushed to response
if (!write_buffers.at(1))
{
/// Results was not pushed to reponse
copyData(*read_buffers.at(0), *used_output.out);
}
}
used_output.delayed_out_raw.reset();
used_output.delayed_out_maybe_compressed.reset();
}
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
/// the client.
used_output.out->finalize();
@ -435,16 +539,11 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
{
tryLogCurrentException(log);
std::string exception_message = getCurrentExceptionMessage(with_stacktrace);
int exception_code = getCurrentExceptionCode();
/** If exception is received from remote server, then stack trace is embedded in message.
* If exception is thrown on local server, then stack trace is in separate field.
*/
auto embedded_stack_trace_pos = exception_message.find("Stack trace");
if (std::string::npos != embedded_stack_trace_pos && !with_stacktrace)
exception_message.resize(embedded_stack_trace_pos);
std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true);
int exception_code = getCurrentExceptionCode();
trySendExceptionToClient(exception_message, exception_code, request, response, used_output);
}

View File

@ -13,6 +13,8 @@ namespace DB
{
class WriteBufferFromHTTPServerResponse;
class CascadeWriteBuffer;
class HTTPHandler : public Poco::Net::HTTPRequestHandler
{
@ -27,6 +29,9 @@ private:
std::shared_ptr<WriteBufferFromHTTPServerResponse> out;
/// Used for sending response. Points to 'out', or to CompressedWriteBuffer(*out), depending on settings.
std::shared_ptr<WriteBuffer> out_maybe_compressed;
std::shared_ptr<CascadeWriteBuffer> delayed_out_raw;
std::shared_ptr<WriteBuffer> delayed_out_maybe_compressed;
};
Server & server;

View File

@ -0,0 +1,14 @@
# Google Test unit tests
# gather all **/gtest*.cpp files with unit tests into single gtest executable. Not CMake-way
execute_process(COMMAND find ${CMAKE_SOURCE_DIR}/dbms -path "*/tests/gtest*.cpp" -printf "%p;" OUTPUT_VARIABLE sources_with_gtest)
if (ENABLE_TESTS AND sources_with_gtest)
add_definitions(-DGTEST_HAS_POSIX_RE=0)
add_executable(unit_tests ${sources_with_gtest})
target_link_libraries(unit_tests gtest_main dbms)
target_include_directories(unit_tests PRIVATE
${ClickHouse_SOURCE_DIR}/dbms/include
)
add_test(unit_tests unit_tests)
endif()