diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 0aba2218671..2f763cfb2f2 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -45,3 +45,10 @@ endif () if (NOT AARCH64) add_subdirectory (libcpuid) endif () + +if (ENABLE_TESTS) + add_subdirectory (googletest) + # avoid problems with + target_compile_definitions (gtest INTERFACE -DGTEST_HAS_POSIX_RE=0) + target_include_directories (gtest INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/googletest/include) +endif() diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 5e4d3b43fd5..e1e84b68f01 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -33,6 +33,7 @@ endif () find_package (Threads) add_subdirectory (src) +add_subdirectory (tests/unit_tests) add_library(string_utils include/DB/Common/StringUtils.h diff --git a/dbms/include/DB/Common/HTMLForm.h b/dbms/include/DB/Common/HTMLForm.h index 0ed9f291111..b760b5c408b 100644 --- a/dbms/include/DB/Common/HTMLForm.h +++ b/dbms/include/DB/Common/HTMLForm.h @@ -5,6 +5,8 @@ #include #include +#include + /** Somehow, in case of POST, Poco::Net::HTMLForm doesn't read parameters from URL, only from body. * This helper allows to read parameters just from URL. @@ -23,4 +25,18 @@ struct HTMLForm : public Poco::Net::HTMLForm std::istringstream istr(uri.getRawQuery()); readUrl(istr); } + + + template + T getParsed(const std::string & key, T default_value) + { + auto it = find(key); + return (it != end()) ? DB::parse(it->second) : default_value; + } + + template + T getParsed(const std::string & key) + { + return DB::parse(get(key)); + } }; diff --git a/dbms/include/DB/IO/MemoryReadWriteBuffer.h b/dbms/include/DB/IO/MemoryReadWriteBuffer.h new file mode 100644 index 00000000000..d38f567a5f3 --- /dev/null +++ b/dbms/include/DB/IO/MemoryReadWriteBuffer.h @@ -0,0 +1,125 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED; + extern const int MEMORY_LIMIT_EXCEEDED; + extern const int LOGICAL_ERROR; +} + +class CascadeWriteBuffer : public WriteBuffer +{ +public: + + using WriteBufferPtrs = std::vector; + using WriteBufferConstructor = std::function; + using WriteBufferConstructors = std::vector; + + CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_ = {}); + + void nextImpl() override; + + /// Should be called once + void getResultBuffers(WriteBufferPtrs & res); + +private: + + WriteBuffer * getNextBuffer(); + + WriteBufferPtrs prepared_sources; + WriteBufferConstructors lazy_sources; + size_t first_lazy_source_num; + size_t num_sources; + + WriteBuffer * curr_buffer; + size_t curr_buffer_num; +}; + + +class ReadBufferFromMemoryWriteBuffer; +class ReadBufferFromTemporaryWriteBuffer; + + +struct IReadableWriteBuffer +{ + /// Creates read buffer from current write buffer + /// Returned buffer points to the first byte of original buffer and finds with current position. + /// Original stream becomes invalid. + virtual std::shared_ptr getReadBuffer() = 0; + + virtual ~IReadableWriteBuffer() {} +}; + + +/// Allow to write large data into memory +class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer +{ +public: + + MemoryWriteBuffer( + size_t max_total_size_ = 0, + size_t initial_chunk_size_ = DBMS_DEFAULT_BUFFER_SIZE, + double growth_rate_ = 2.0); + + void nextImpl() override; + + std::shared_ptr getReadBuffer() override; + + ~MemoryWriteBuffer() override; + +protected: + + const size_t max_total_size; + const size_t initial_chunk_size; + const double growth_rate; + + using Container = std::forward_list; + + Container chunk_list; + Container::iterator chunk_tail; + size_t total_chunks_size = 0; + + void addChunk(); + + friend class ReadBufferFromMemoryWriteBuffer; +}; + + +/// Rereadable WriteBuffer, could be used as disk buffer +/// Creates unique temporary in directory (and directory itself) +class WriteBufferFromTemporaryFile : public WriteBufferFromFile, public IReadableWriteBuffer +{ +public: + using Ptr = std::shared_ptr; + + /// path_template examle "/opt/clickhouse/tmp/data.XXXXXX" + static Ptr create(const std::string & path_template_); + + std::shared_ptr getReadBuffer() override; + + ~WriteBufferFromTemporaryFile() override; + +protected: + + WriteBufferFromTemporaryFile(int fd, const std::string & tmp_path) + : WriteBufferFromFile(fd, tmp_path) + {} +}; + + +} diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index 81336f645f5..3c866dbdca4 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -162,4 +163,7 @@ private: }; +using ReadBufferPtr = std::shared_ptr; + + } diff --git a/dbms/include/DB/IO/ReadBufferFromFile.h b/dbms/include/DB/IO/ReadBufferFromFile.h index 479f6fceadd..7faa81b0607 100644 --- a/dbms/include/DB/IO/ReadBufferFromFile.h +++ b/dbms/include/DB/IO/ReadBufferFromFile.h @@ -20,7 +20,7 @@ namespace DB */ class ReadBufferFromFile : public ReadBufferFromFileDescriptor { -private: +protected: std::string file_name; CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead}; @@ -29,7 +29,7 @@ public: char * existing_memory = nullptr, size_t alignment = 0); /// Use pre-opened file descriptor. - ReadBufferFromFile(int fd, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, + ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, char * existing_memory = nullptr, size_t alignment = 0); ~ReadBufferFromFile() override; diff --git a/dbms/include/DB/IO/WriteBuffer.h b/dbms/include/DB/IO/WriteBuffer.h index 62d2e377c12..8d6aa80d488 100644 --- a/dbms/include/DB/IO/WriteBuffer.h +++ b/dbms/include/DB/IO/WriteBuffer.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -96,4 +98,7 @@ private: }; +using WriteBufferPtr = std::shared_ptr; + + } diff --git a/dbms/include/DB/IO/WriteBufferFromFile.h b/dbms/include/DB/IO/WriteBufferFromFile.h index 5252ed4d39e..521fb2e1aab 100644 --- a/dbms/include/DB/IO/WriteBufferFromFile.h +++ b/dbms/include/DB/IO/WriteBufferFromFile.h @@ -24,7 +24,7 @@ namespace DB */ class WriteBufferFromFile : public WriteBufferFromFileDescriptor { -private: +protected: std::string file_name; CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForWrite}; @@ -40,9 +40,8 @@ public: /// Use pre-opened file descriptor. WriteBufferFromFile( int fd, + const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, - int flags = -1, - mode_t mode = 0666, char * existing_memory = nullptr, size_t alignment = 0); diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 373d3eb8088..47aabbc1f12 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -259,6 +259,11 @@ struct Settings */ \ M(SettingUInt64, input_format_allow_errors_num, 0) \ M(SettingFloat, input_format_allow_errors_ratio, 0) \ + \ + /** Size of buffer with query results */ \ + M(SettingUInt64, send_buffer_size, DBMS_DEFAULT_BUFFER_SIZE) \ + M(SettingUInt64, http_response_memory_buffer_size, 0) \ + M(SettingString, http_response_memory_buffer_overflow, "") \ /// Possible limits for query execution. Limits limits; diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index dcefe9d64d9..b1b4b9b3221 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -364,6 +364,8 @@ namespace ErrorCodes extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 359; extern const int CANNOT_CREATE_CHARSET_CONVERTER = 360; extern const int SEEK_POSITION_OUT_OF_BOUND = 361; + extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED = 362; + extern const int CANNOT_CREATE_IO_BUFFER = 363; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/IO/MemoryReadWriteBuffer.cpp b/dbms/src/IO/MemoryReadWriteBuffer.cpp new file mode 100644 index 00000000000..71a572e85ac --- /dev/null +++ b/dbms/src/IO/MemoryReadWriteBuffer.cpp @@ -0,0 +1,308 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED; + extern const int MEMORY_LIMIT_EXCEEDED; + extern const int LOGICAL_ERROR; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER; + extern const int CANNOT_CREATE_IO_BUFFER; +} + + +inline std::ostream & operator << (std::ostream & stream, BufferBase & buffer) +{ + stream + << " begin=" << reinterpret_cast(buffer.buffer().begin()) + << " pos=" << buffer.position() - buffer.buffer().begin() + << " size=" << buffer.buffer().size() + << " int_size=" << buffer.internalBuffer().size() << "\n"; + + return stream; +} + + +CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_) + : WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_)) +{ + first_lazy_source_num = prepared_sources.size(); + num_sources = first_lazy_source_num + lazy_sources.size(); + + /// fill lazy sources by nullptr + prepared_sources.resize(num_sources); + + curr_buffer_num = 0; + curr_buffer = getNextBuffer(); + set(curr_buffer->buffer().begin(), curr_buffer->buffer().size()); +} + + +void CascadeWriteBuffer::nextImpl() +{ + try + { + curr_buffer->position() = position(); + curr_buffer->next(); + } + catch (const Exception & e) + { + if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED) + { + /// actualize position of old buffer (it was reset by WriteBuffer::next) + curr_buffer->position() = position(); + + /// good situation, fetch next WriteBuffer + ++curr_buffer_num; + curr_buffer = getNextBuffer(); + } + else + throw; + } + + set(curr_buffer->buffer().begin(), curr_buffer->buffer().size()); +} + + +void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res) +{ + curr_buffer->position() = position(); + res = std::move(prepared_sources); + + curr_buffer = nullptr; + curr_buffer_num = num_sources = 0; +} + + +WriteBuffer * CascadeWriteBuffer::getNextBuffer() +{ + if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources) + { + if (!prepared_sources[curr_buffer_num]) + { + WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr; + prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf); + } + } + else if (curr_buffer_num >= num_sources) + throw Exception("There are no WriteBuffers to write result", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER); + + WriteBuffer * res = prepared_sources[curr_buffer_num].get(); + if (!res) + throw Exception("Required WriteBuffer is not created", ErrorCodes::CANNOT_CREATE_IO_BUFFER); + + return res; +} + + +class ReadBufferFromMemoryWriteBuffer : public ReadBuffer +{ +public: + + ReadBufferFromMemoryWriteBuffer(MemoryWriteBuffer && origin) + : + ReadBuffer(nullptr, 0), + chunk_list(std::move(origin.chunk_list)), + end_pos(origin.position()) + { + chunk_head = chunk_list.begin(); + setChunk(); + } + + bool nextImpl() override + { + if (chunk_head == chunk_list.end()) + return false; + + ++chunk_head; + return setChunk(); + } + + ~ReadBufferFromMemoryWriteBuffer() + { + for (const auto & range : chunk_list) + Allocator().free(range.begin(), range.size()); + } + +private: + + /// update buffers and position according to chunk_head pointer + bool setChunk() + { + if (chunk_head != chunk_list.end()) + { + internalBuffer() = *chunk_head; + + auto next_chunk = chunk_head; + ++next_chunk; + + /// It is last chunk, it should be truncated + if (next_chunk != chunk_list.end()) + buffer() = internalBuffer(); + else + buffer() = Buffer(internalBuffer().begin(), end_pos); + + position() = buffer().begin(); + } + else + { + buffer() = internalBuffer() = Buffer(nullptr, nullptr); + position() = nullptr; + } + + return buffer().size() != 0; + } + + using Container = std::forward_list; + + Container chunk_list; + Container::iterator chunk_head; + Position end_pos; +}; + + + +MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_) +: WriteBuffer(nullptr, 0), max_total_size(max_total_size_), initial_chunk_size(initial_chunk_size_), growth_rate(growth_rate_) +{ + addChunk(); +} + +void MemoryWriteBuffer::nextImpl() +{ + if (unlikely(hasPendingData())) + { + /// ignore flush + buffer() = Buffer(pos, buffer().end()); + return; + } + + addChunk(); +} + +void MemoryWriteBuffer::addChunk() +{ + size_t next_chunk_size; + if (chunk_list.empty()) + { + chunk_tail = chunk_list.before_begin(); + next_chunk_size = initial_chunk_size; + } + else + { + next_chunk_size = std::max(1ul, static_cast(chunk_tail->size() * growth_rate)); + } + + if (max_total_size) + { + if (total_chunks_size + next_chunk_size > max_total_size) + next_chunk_size = max_total_size - total_chunks_size; + + if (0 == next_chunk_size) + throw Exception("MemoryWriteBuffer limit is exhausted", ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED); + } + + Position begin = reinterpret_cast(Allocator().alloc(next_chunk_size)); + chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size); + total_chunks_size += next_chunk_size; + + set(chunk_tail->begin(), chunk_tail->size()); +} + +std::shared_ptr MemoryWriteBuffer::getReadBuffer() +{ + auto res = std::make_shared(std::move(*this)); + + /// invalidate members + chunk_list.clear(); + chunk_tail = chunk_list.begin(); + + return res; +} + +MemoryWriteBuffer::~MemoryWriteBuffer() +{ + for (const auto & range : chunk_list) + Allocator().free(range.begin(), range.size()); +} + + +WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & path_template_) +{ + std::string path_template = path_template_; + + if (path_template.empty() || path_template.back() != 'X') + path_template += "XXXXXX"; + + Poco::File(Poco::Path(path_template).makeParent()).createDirectories(); + + int fd = mkstemp(const_cast(path_template.c_str())); + if (fd < 0) + throw Exception("Cannot create temporary file " + path_template, ErrorCodes::CANNOT_OPEN_FILE); + + return Ptr(new WriteBufferFromTemporaryFile(fd, path_template)); +} + + +WriteBufferFromTemporaryFile::~WriteBufferFromTemporaryFile() +{ + /// remove temporary file if it was not passed to ReadBuffer + if (getFD() >= 0 && !getFileName().empty()) + { + Poco::File(getFileName()).remove(); + } +} + + +class ReadBufferFromTemporaryWriteBuffer : public ReadBufferFromFile +{ +public: + + static ReadBufferPtr createFrom(WriteBufferFromTemporaryFile * origin) + { + int fd = origin->getFD(); + std::string file_name = origin->getFileName(); + + off_t res = lseek(fd, 0, SEEK_SET); + if (-1 == res) + throwFromErrno("Cannot reread temporary file " + file_name, ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + return std::make_shared(fd, file_name); + } + + ReadBufferFromTemporaryWriteBuffer(int fd, const std::string & file_name) + : ReadBufferFromFile(fd, file_name) + {} + + ~ReadBufferFromTemporaryWriteBuffer() override + { + /// remove temporary file + Poco::File(file_name).remove(); + } +}; + +ReadBufferPtr WriteBufferFromTemporaryFile::getReadBuffer() +{ + /// ignore buffer, write all data to file and reread it from disk + sync(); + + auto res = ReadBufferFromTemporaryWriteBuffer::createFrom(this); + + /// invalidate FD to avoid close(fd) in destructor + setFD(-1); + file_name = {}; + + return res; +} + + +} diff --git a/dbms/src/IO/ReadBufferFromFile.cpp b/dbms/src/IO/ReadBufferFromFile.cpp index eff1d8aebf2..b925d35baa0 100644 --- a/dbms/src/IO/ReadBufferFromFile.cpp +++ b/dbms/src/IO/ReadBufferFromFile.cpp @@ -61,11 +61,14 @@ ReadBufferFromFile::ReadBufferFromFile( ReadBufferFromFile::ReadBufferFromFile( int fd, + const std::string & original_file_name, size_t buf_size, int flags, char * existing_memory, size_t alignment) - : ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")") + : + ReadBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), + file_name(original_file_name.empty() ? "(fd = " + toString(fd) + ")" : original_file_name) { } diff --git a/dbms/src/IO/WriteBufferFromFile.cpp b/dbms/src/IO/WriteBufferFromFile.cpp index 4c00337433c..cb547c31156 100644 --- a/dbms/src/IO/WriteBufferFromFile.cpp +++ b/dbms/src/IO/WriteBufferFromFile.cpp @@ -66,12 +66,13 @@ WriteBufferFromFile::WriteBufferFromFile( /// Use pre-opened file descriptor. WriteBufferFromFile::WriteBufferFromFile( int fd, + const std::string & original_file_name, size_t buf_size, - int flags, - mode_t mode, char * existing_memory, size_t alignment) - : WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), file_name("(fd = " + toString(fd) + ")") + : + WriteBufferFromFileDescriptor(fd, buf_size, existing_memory, alignment), + file_name(original_file_name.empty() ? "(fd = " + toString(fd) + ")" : original_file_name) { } diff --git a/dbms/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp b/dbms/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp new file mode 100644 index 00000000000..1a7a3c180b8 --- /dev/null +++ b/dbms/src/IO/tests/gtest_cascade_and_memory_write_buffer.cpp @@ -0,0 +1,193 @@ +#include +#include "/home/vludv/ClickHouse/contrib/googletest/include/gtest/gtest.h" +#include + +#include + +#include +#include +#include +#include + +using namespace DB; + + +static std::string makeTestArray(size_t size) +{ + std::string res(size, '\0'); + for (size_t i = 0; i < res.size(); ++i) + res[i] = i % 256; + return res; +} + +static void testCascadeBufferRedability( + std::string data, + CascadeWriteBuffer::WriteBufferPtrs && arg1, + CascadeWriteBuffer::WriteBufferConstructors && arg2) +{ + CascadeWriteBuffer cascade{std::move(arg1), std::move(arg2)}; + + cascade.write(&data[0], data.size()); + EXPECT_EQ(cascade.count(), data.size()); + + std::vector write_buffers; + std::vector read_buffers; + std::vector read_buffers_raw; + cascade.getResultBuffers(write_buffers); + + for (WriteBufferPtr & wbuf : write_buffers) + { + if (!wbuf) + continue; + + auto wbuf_readable = dynamic_cast(wbuf.get()); + ASSERT_FALSE(!wbuf_readable); + + auto rbuf = wbuf_readable->getReadBuffer(); + ASSERT_FALSE(!rbuf); + + read_buffers.emplace_back(rbuf); + read_buffers_raw.emplace_back(rbuf.get()); + } + + ConcatReadBuffer concat(read_buffers_raw); + std::string decoded_data; + { + WriteBufferFromString decoded_data_writer(decoded_data); + copyData(concat, decoded_data_writer); + } + + ASSERT_EQ(data, decoded_data); +} + + +TEST(CascadeWriteBuffer, RereadWithTwoMemoryBuffers) +try +{ + size_t max_s = 32; + for (size_t s = 0; s < max_s; ++s) + { + testCascadeBufferRedability(makeTestArray(s), + { + std::make_shared(s/2, 1, 2.0), + std::make_shared(s - s/2, 1, 2.0) + }, + {} + ); + + testCascadeBufferRedability(makeTestArray(s), + { + std::make_shared(s, 2, 1.5), + }, + {} + ); + + testCascadeBufferRedability(makeTestArray(s), + { + std::make_shared(0, 1, 1.0), + }, + {} + ); + + testCascadeBufferRedability(makeTestArray(s), + { + std::make_shared(std::max(1ul, s/2), std::max(2ul, s/4), 0.5), + std::make_shared(0, 4, 1.0), + }, + {} + ); + + testCascadeBufferRedability(makeTestArray(max_s), + { + std::make_shared(s, 1, 2.0) + }, + { + [=] (auto prev) { return std::make_shared(max_s - s, 1, 2.0); } + } + ); + + testCascadeBufferRedability(makeTestArray(max_s), + {}, + { + [=] (auto prev) { return std::make_shared(max_s - s, 1, 2.0); }, + [=] (auto prev) { return std::make_shared(s, 1, 2.0); } + } + ); + } +} +catch (const DB::Exception & e) +{ + std::cerr << getCurrentExceptionMessage(true) << "\n"; + throw; +} +catch (...) +{ + throw; +} + + +TEST(TemporaryFileWriteBuffer, WriteAndReread) +try +{ + for (size_t s = 0; s < 2500000; s += 500000) + { + std::string tmp_template = "tmp/TemporaryFileWriteBuffer.XXXXXX"; + std::string data = makeTestArray(s); + + auto buf = WriteBufferFromTemporaryFile::create(tmp_template); + buf->write(&data[0], data.size()); + + std::string tmp_filename = buf->getFileName(); + ASSERT_EQ(tmp_template.size(), tmp_filename.size()); + + auto reread_buf = buf->getReadBuffer(); + std::string decoded_data; + { + WriteBufferFromString wbuf_decode(decoded_data); + copyData(*reread_buf, wbuf_decode); + } + + ASSERT_EQ(data.size(), decoded_data.size()); + ASSERT_TRUE(data == decoded_data); + + buf.reset(); + reread_buf.reset(); + ASSERT_TRUE(!Poco::File(tmp_filename).exists()); + } +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(true) << "\n"; + throw; +} + + +TEST(CascadeWriteBuffer, RereadWithTemporaryFileWriteBuffer) +try +{ + const std::string tmp_template = "tmp/RereadWithTemporaryFileWriteBuffer.XXXXXX"; + + for (size_t s = 0; s < 4000000; s += 1000000) + { + testCascadeBufferRedability(makeTestArray(s), + {}, + { + [=] (auto prev) { return WriteBufferFromTemporaryFile::create(tmp_template); } + } + ); + + testCascadeBufferRedability(makeTestArray(s), + { + std::make_shared(std::max(1ul, s/3ul), 2, 1.5), + }, + { + [=] (auto prev) { return WriteBufferFromTemporaryFile::create(tmp_template); } + } + ); + } +} +catch (...) +{ + std::cerr << getCurrentExceptionMessage(true) << "\n"; + throw; +} diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index 6a507b65582..9f63d6af75f 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -1,9 +1,11 @@ #include #include +#include #include #include +#include #include #include @@ -13,7 +15,11 @@ #include #include #include +#include +#include #include +#include +#include #include @@ -36,6 +42,7 @@ namespace ErrorCodes extern const int CANNOT_PARSE_DATE; extern const int CANNOT_PARSE_DATETIME; extern const int CANNOT_PARSE_NUMBER; + extern const int CANNOT_OPEN_FILE; extern const int UNKNOWN_ELEMENT_IN_AST; extern const int UNKNOWN_TYPE_OF_AST_NODE; @@ -103,7 +110,8 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE; else if (exception_code == ErrorCodes::NOT_IMPLEMENTED) return HTTPResponse::HTTP_NOT_IMPLEMENTED; - else if (exception_code == ErrorCodes::SOCKET_TIMEOUT) + else if (exception_code == ErrorCodes::SOCKET_TIMEOUT || + exception_code == ErrorCodes::CANNOT_OPEN_FILE) return HTTPResponse::HTTP_SERVICE_UNAVAILABLE; return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR; @@ -133,37 +141,6 @@ void HTTPHandler::processQuery( query_param += '\n'; - /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). - String http_response_compression_methods = request.get("Accept-Encoding", ""); - bool client_supports_http_compression = false; - ZlibCompressionMethod http_response_compression_method {}; - - if (!http_response_compression_methods.empty()) - { - /// Both gzip and deflate are supported. If the client supports both, gzip is preferred. - /// NOTE parsing of the list of methods is slightly incorrect. - if (std::string::npos != http_response_compression_methods.find("gzip")) - { - client_supports_http_compression = true; - http_response_compression_method = ZlibCompressionMethod::Gzip; - } - else if (std::string::npos != http_response_compression_methods.find("deflate")) - { - client_supports_http_compression = true; - http_response_compression_method = ZlibCompressionMethod::Zlib; - } - } - - used_output.out = std::make_shared( - response, client_supports_http_compression, http_response_compression_method); - - /// Client can pass a 'compress' flag in the query string. In this case the query result is - /// compressed using internal algorithm. This is not reflected in HTTP headers. - if (parse(params.get("compress", "0"))) - used_output.out_maybe_compressed = std::make_shared(*used_output.out); - else - used_output.out_maybe_compressed = used_output.out; - /// User name and password can be passed using query parameters or using HTTP Basic auth (both methods are insecure). /// The user and password can be passed by headers (similar to X-Auth-*), which is used by load balancers to pass authentication information std::string user = request.get("X-ClickHouse-User", params.get("user", "default")); @@ -186,6 +163,96 @@ void HTTPHandler::processQuery( context.setUser(user, password, request.clientAddress(), quota_key); context.setCurrentQueryId(query_id); + + /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). + String http_response_compression_methods = request.get("Accept-Encoding", ""); + bool client_supports_http_compression = false; + ZlibCompressionMethod http_response_compression_method {}; + + if (!http_response_compression_methods.empty()) + { + /// Both gzip and deflate are supported. If the client supports both, gzip is preferred. + /// NOTE parsing of the list of methods is slightly incorrect. + if (std::string::npos != http_response_compression_methods.find("gzip")) + { + client_supports_http_compression = true; + http_response_compression_method = ZlibCompressionMethod::Gzip; + } + else if (std::string::npos != http_response_compression_methods.find("deflate")) + { + client_supports_http_compression = true; + http_response_compression_method = ZlibCompressionMethod::Zlib; + } + } + + /// Client can pass a 'compress' flag in the query string. In this case the query result is + /// compressed using internal algorithm. This is not reflected in HTTP headers. + bool internal_compression = params.getParsed("compress", false); + + size_t response_buffer_size = params.getParsed("buffer_size", DBMS_DEFAULT_BUFFER_SIZE); + response_buffer_size = response_buffer_size ? response_buffer_size : DBMS_DEFAULT_BUFFER_SIZE; + + auto response_raw = std::make_shared( + response, client_supports_http_compression, http_response_compression_method, response_buffer_size); + + WriteBufferPtr response_maybe_compressed; + + size_t result_buffer_memory_size = params.getParsed("result_buffer_size", 0); + bool use_memory_buffer = result_buffer_memory_size; + + bool result_buffer_overflow_to_disk = params.get("result_buffer_on_overflow", "http") == "disk"; + + if (use_memory_buffer) + { + ConcatWriteBuffer::WriteBufferPtrs concat_buffers1{ std::make_shared(result_buffer_memory_size) }; + ConcatWriteBuffer::WriteBufferConstructors concat_buffers2{}; + + if (result_buffer_overflow_to_disk) + { + std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/" + escapeForFileName(user) + ".XXXXXX"; + + auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &) + { + return WriteBufferFromTemporaryFile::create(tmp_path_template); + }; + + concat_buffers2.emplace_back(std::move(create_tmp_disk_buffer)); + } + else + { + auto rewrite_memory_buffer_to_http_and_continue = [response_raw] (const WriteBufferPtr & prev_buf) + { + auto memory_write_buffer = typeid_cast(prev_buf.get()); + + if (!memory_write_buffer) + throw Exception("Memory buffer was not allocated", ErrorCodes::LOGICAL_ERROR); + + auto memory_read_buffer = memory_write_buffer->getReadBuffer(); + copyData(*memory_read_buffer, *response_raw); + + return response_raw; + }; + + concat_buffers2.emplace_back(rewrite_memory_buffer_to_http_and_continue); + } + + used_output.out = response_raw; + used_output.delayed_out_raw = std::make_shared(std::move(concat_buffers1), std::move(concat_buffers2)); + if (internal_compression) + used_output.delayed_out_maybe_compressed = std::make_shared(*used_output.delayed_out_raw); + else + used_output.delayed_out_maybe_compressed = used_output.delayed_out_raw; + used_output.out_maybe_compressed = used_output.delayed_out_maybe_compressed; + } + else + { + used_output.out = response_raw; + if (internal_compression) + used_output.out_maybe_compressed = std::make_shared(*response_raw); + else + used_output.out_maybe_compressed = response_raw; + } + std::unique_ptr in_param = std::make_unique(query_param); std::unique_ptr in_post_raw = std::make_unique(istr); @@ -268,7 +335,11 @@ void HTTPHandler::processQuery( auto readonly_before_query = limits.readonly; - for (Poco::Net::NameValueCollection::ConstIterator it = params.begin(); it != params.end(); ++it) + NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", + "buffer_size", "result_buffer_size", "result_buffer_on_overflow" + }; + + for (auto it = params.begin(); it != params.end(); ++it) { if (it->first == "database") { @@ -278,14 +349,7 @@ void HTTPHandler::processQuery( { context.setDefaultFormat(it->second); } - else if (it->first == "query" - || it->first == "compress" - || it->first == "decompress" - || it->first == "user" - || it->first == "password" - || it->first == "quota_key" - || it->first == "query_id" - || it->first == "stacktrace") + else if (reserved_param_names.find(it->first) != reserved_param_names.end()) { } else @@ -346,6 +410,46 @@ void HTTPHandler::processQuery( executeQuery(*in, *used_output.out_maybe_compressed, /* allow_into_outfile = */ false, context, [&response] (const String & content_type) { response.setContentType(content_type); }); + if (use_memory_buffer) + { + std::vector write_buffers; + used_output.delayed_out_raw->getResultBuffers(write_buffers); + + std::vector read_buffers; + ConcatReadBuffer::ReadBuffers read_buffers_raw_ptr; + for (auto & write_buf : write_buffers) + { + IReadableWriteBuffer * write_buf_concrete; + if (write_buf && (write_buf_concrete = dynamic_cast(write_buf.get()))) + { + read_buffers.emplace_back(write_buf_concrete->getReadBuffer()); + read_buffers_raw_ptr.emplace_back(read_buffers.back().get()); + } + } + + if (result_buffer_overflow_to_disk) + { + /// All results in concat buffer + if (read_buffers_raw_ptr.empty()) + throw Exception("There are no buffers to overwrite result into response", ErrorCodes::LOGICAL_ERROR); + + ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); + copyData(concat_read_buffer, *used_output.out); + } + else + { + /// Results could be either in the first buffer, or they could be already pushed to response + if (!write_buffers.at(1)) + { + /// Results was not pushed to reponse + copyData(*read_buffers.at(0), *used_output.out); + } + } + + used_output.delayed_out_raw.reset(); + used_output.delayed_out_maybe_compressed.reset(); + } + /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to /// the client. used_output.out->finalize(); @@ -435,16 +539,11 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne { tryLogCurrentException(log); - std::string exception_message = getCurrentExceptionMessage(with_stacktrace); - int exception_code = getCurrentExceptionCode(); - /** If exception is received from remote server, then stack trace is embedded in message. * If exception is thrown on local server, then stack trace is in separate field. */ - - auto embedded_stack_trace_pos = exception_message.find("Stack trace"); - if (std::string::npos != embedded_stack_trace_pos && !with_stacktrace) - exception_message.resize(embedded_stack_trace_pos); + std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true); + int exception_code = getCurrentExceptionCode(); trySendExceptionToClient(exception_message, exception_code, request, response, used_output); } diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index 44e9f56220f..62d14fb6573 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -13,6 +13,8 @@ namespace DB { class WriteBufferFromHTTPServerResponse; +class CascadeWriteBuffer; + class HTTPHandler : public Poco::Net::HTTPRequestHandler { @@ -27,6 +29,9 @@ private: std::shared_ptr out; /// Used for sending response. Points to 'out', or to CompressedWriteBuffer(*out), depending on settings. std::shared_ptr out_maybe_compressed; + + std::shared_ptr delayed_out_raw; + std::shared_ptr delayed_out_maybe_compressed; }; Server & server; diff --git a/dbms/tests/unit_tests/CMakeLists.txt b/dbms/tests/unit_tests/CMakeLists.txt new file mode 100644 index 00000000000..22fc6059102 --- /dev/null +++ b/dbms/tests/unit_tests/CMakeLists.txt @@ -0,0 +1,14 @@ +# Google Test unit tests + +# gather all **/gtest*.cpp files with unit tests into single gtest executable. Not CMake-way +execute_process(COMMAND find ${CMAKE_SOURCE_DIR}/dbms -path "*/tests/gtest*.cpp" -printf "%p;" OUTPUT_VARIABLE sources_with_gtest) + +if (ENABLE_TESTS AND sources_with_gtest) + add_definitions(-DGTEST_HAS_POSIX_RE=0) + add_executable(unit_tests ${sources_with_gtest}) + target_link_libraries(unit_tests gtest_main dbms) + target_include_directories(unit_tests PRIVATE + ${ClickHouse_SOURCE_DIR}/dbms/include + ) + add_test(unit_tests unit_tests) +endif()