Merge branch 'master' into nyc-taxi-data

This commit is contained in:
Alexey Milovidov 2017-01-30 09:24:51 +03:00
commit 932346f0e2
8 changed files with 193 additions and 137 deletions

View File

@ -126,8 +126,8 @@ if (NOT APPLE)
endif ()
if (USE_STATIC_LIBRARIES AND NOT CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -no-pie")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -no-pie")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --no-pie")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --no-pie")
endif ()

View File

@ -40,7 +40,13 @@ public:
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;;
void insert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;
void insertDefault() override;
void insertDefault() override
{
nested_column->insertDefault();
getNullMap().push_back(1);
}
void popBack(size_t n) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;

View File

@ -1,6 +1,14 @@
#pragma once
#include <Poco/Net/HTTPServerResponse.h>
namespace Poco
{
namespace Net
{
class HTTPServerResponse;
}
}
namespace DB
{

View File

@ -3,14 +3,9 @@
#include <experimental/optional>
#include <mutex>
#include <Poco/Net/HTTPServerResponse.h>
#include <DB/Common/Exception.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/ZlibDeflatingWriteBuffer.h>
#include <DB/IO/HTTPCommon.h>
#include <DB/Common/NetException.h>
@ -18,15 +13,18 @@
#include <DB/Core/Progress.h>
namespace DB
namespace Poco
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
namespace Net
{
class HTTPServerResponse;
}
}
namespace DB
{
/// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream
/// (using response.send()) only after data is flushed for the first time. This is needed in HTTP
/// servers to change some HTTP headers (e.g. response code) before any data is sent to the client
@ -71,126 +69,28 @@ private:
/// Must be called under locked mutex.
/// This method send headers, if this was not done already,
/// but not finish them with \r\n, allowing to send more headers subsequently.
void startSendHeaders()
{
if (!headers_started_sending)
{
headers_started_sending = true;
if (add_cors_header)
response.set("Access-Control-Allow-Origin", "*");
setResponseDefaultHeaders(response);
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
}
}
void startSendHeaders();
/// This method finish headers with \r\n, allowing to start to send body.
void finishSendHeaders()
{
if (!headers_finished_sending)
{
headers_finished_sending = true;
void finishSendHeaders();
/// Send end of headers delimiter.
*response_header_ostr << "\r\n" << std::flush;
}
}
void nextImpl() override
{
{
std::lock_guard<std::mutex> lock(mutex);
startSendHeaders();
if (!out)
{
if (compress)
{
if (compression_method == ZlibCompressionMethod::Gzip)
*response_header_ostr << "Content-Encoding: gzip\r\n";
else if (compression_method == ZlibCompressionMethod::Zlib)
*response_header_ostr << "Content-Encoding: deflate\r\n";
else
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
ErrorCodes::LOGICAL_ERROR);
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
out_raw.emplace(*response_body_ostr);
deflating_buf.emplace(out_raw.value(), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &deflating_buf.value();
}
else
{
out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &out_raw.value();
}
}
finishSendHeaders();
}
out->position() = position();
out->next();
}
void nextImpl() override;
public:
WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerResponse & response_,
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
ZlibCompressionMethod compression_method_ = ZlibCompressionMethod::Gzip,
size_t size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(size), response(response_),
compress(compress_), compression_method(compression_method_) {}
size_t size = DBMS_DEFAULT_BUFFER_SIZE);
/// Writes progess in repeating HTTP headers.
void onProgress(const Progress & progress)
{
std::lock_guard<std::mutex> lock(mutex);
/// Cannot add new headers if body was started to send.
if (headers_finished_sending)
return;
accumulated_progress.incrementPiecewiseAtomically(progress);
if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
{
progress_watch.restart();
/// Send all common headers before our special progress headers.
startSendHeaders();
std::string progress_string;
{
WriteBufferFromString progress_string_writer(progress_string);
accumulated_progress.writeJSON(progress_string_writer);
}
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string << "\r\n" << std::flush;
}
}
void onProgress(const Progress & progress);
/// Send at least HTTP headers if no data has been sent yet.
/// Use after the data has possibly been sent and no error happened (and thus you do not plan
/// to change response HTTP code.
/// This method is idempotent.
void finalize()
{
if (offset())
{
next();
}
else
{
/// If no remaining data, just send headers.
std::lock_guard<std::mutex> lock(mutex);
startSendHeaders();
finishSendHeaders();
}
}
void finalize();
/// Turn compression on or off.
/// The setting has any effect only if HTTP headers haven't been sent yet.
@ -219,17 +119,7 @@ public:
send_progress_interval_ms = send_progress_interval_ms_;
}
~WriteBufferFromHTTPServerResponse()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
~WriteBufferFromHTTPServerResponse();
};
}

View File

@ -148,12 +148,6 @@ void ColumnNullable::insertFrom(const IColumn & src, size_t n)
getNullMap().push_back(src_concrete.getNullMap()[n]);
}
void ColumnNullable::insertDefault()
{
nested_column->insertDefault();
getNullMap().push_back(1);
}
void ColumnNullable::popBack(size_t n)
{
nested_column->popBack(n);

View File

@ -1,6 +1,8 @@
#include <DB/IO/HTTPCommon.h>
#include <Poco/Util/Application.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{

View File

@ -0,0 +1,156 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <DB/Common/Exception.h>
#include <DB/IO/WriteBufferFromHTTPServerResponse.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/HTTPCommon.h>
#include <DB/Common/NetException.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Core/Progress.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void WriteBufferFromHTTPServerResponse::startSendHeaders()
{
if (!headers_started_sending)
{
headers_started_sending = true;
if (add_cors_header)
response.set("Access-Control-Allow-Origin", "*");
setResponseDefaultHeaders(response);
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
}
}
void WriteBufferFromHTTPServerResponse::finishSendHeaders()
{
if (!headers_finished_sending)
{
headers_finished_sending = true;
/// Send end of headers delimiter.
*response_header_ostr << "\r\n" << std::flush;
}
}
void WriteBufferFromHTTPServerResponse::nextImpl()
{
{
std::lock_guard<std::mutex> lock(mutex);
startSendHeaders();
if (!out)
{
if (compress)
{
if (compression_method == ZlibCompressionMethod::Gzip)
*response_header_ostr << "Content-Encoding: gzip\r\n";
else if (compression_method == ZlibCompressionMethod::Zlib)
*response_header_ostr << "Content-Encoding: deflate\r\n";
else
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
ErrorCodes::LOGICAL_ERROR);
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
out_raw.emplace(*response_body_ostr);
deflating_buf.emplace(out_raw.value(), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &deflating_buf.value();
}
else
{
out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &out_raw.value();
}
}
finishSendHeaders();
}
out->position() = position();
out->next();
}
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerResponse & response_,
bool compress_,
ZlibCompressionMethod compression_method_,
size_t size)
: BufferWithOwnMemory<WriteBuffer>(size), response(response_),
compress(compress_), compression_method(compression_method_)
{
}
void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
{
std::lock_guard<std::mutex> lock(mutex);
/// Cannot add new headers if body was started to send.
if (headers_finished_sending)
return;
accumulated_progress.incrementPiecewiseAtomically(progress);
if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
{
progress_watch.restart();
/// Send all common headers before our special progress headers.
startSendHeaders();
std::string progress_string;
{
WriteBufferFromString progress_string_writer(progress_string);
accumulated_progress.writeJSON(progress_string_writer);
}
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string << "\r\n" << std::flush;
}
}
void WriteBufferFromHTTPServerResponse::finalize()
{
if (offset())
{
next();
}
else
{
/// If no remaining data, just send headers.
std::lock_guard<std::mutex> lock(mutex);
startSendHeaders();
finishSendHeaders();
}
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -1,6 +1,6 @@
#This strings autochanged from release_lib.sh :
set(VERSION_DESCRIBE v1.1.54134-testing)
set(VERSION_REVISION 54139)
set(VERSION_DESCRIBE v1.1.54140-testing)
set(VERSION_REVISION 54140)
#===end of autochange
set(VERSION_MAJOR 1)