Merge remote-tracking branch 'origin' into integration-2

This commit is contained in:
Yatsishin Ilya 2021-04-30 12:18:24 +03:00
commit 744fb93945
37 changed files with 182 additions and 153 deletions

View File

@ -159,17 +159,12 @@ void IBridge::initialize(Application & self)
if (port > 0xFFFF) if (port > 0xFFFF)
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND); throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT); http_timeout = config().getUInt64("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
max_server_connections = config().getUInt("max-server-connections", 1024); max_server_connections = config().getUInt("max-server-connections", 1024);
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10); keep_alive_timeout = config().getUInt64("keep-alive-timeout", 10);
initializeTerminationAndSignalProcessing(); initializeTerminationAndSignalProcessing();
#if USE_ODBC
if (bridgeName() == "ODBCBridge")
Poco::Data::ODBC::Connector::registerConnector();
#endif
ServerApplication::initialize(self); // NOLINT ServerApplication::initialize(self); // NOLINT
} }

View File

@ -35,22 +35,15 @@ RUN curl -O https://clickhouse-builds.s3.yandex.net/utils/1/dpkg-deb \
RUN apt-get update \ RUN apt-get update \
&& apt-get install \ && apt-get install \
alien \ alien \
clang-10 \
clang-11 \ clang-11 \
clang-tidy-10 \
clang-tidy-11 \ clang-tidy-11 \
cmake \ cmake \
debhelper \ debhelper \
devscripts \ devscripts \
g++-9 \
gcc-9 \
gdb \ gdb \
git \ git \
gperf \ gperf \
lld-10 \
lld-11 \ lld-11 \
llvm-10 \
llvm-10-dev \
llvm-11 \ llvm-11 \
llvm-11-dev \ llvm-11-dev \
moreutils \ moreutils \

View File

@ -606,7 +606,7 @@ SELECT arrayReverseSort((x, y) -> -y, [4, 3, 5], [1, 2, 3]) AS res;
如果要获取数组中唯一项的列表可以使用arrayReducegroupUniqArrayarr 如果要获取数组中唯一项的列表可以使用arrayReducegroupUniqArrayarr
## arryjoin(arr) {#array-functions-join} ## arrayJoin(arr) {#array-functions-join}
一个特殊的功能。请参见[«ArrayJoin函数»](array-join.md#functions_arrayjoin)部分。 一个特殊的功能。请参见[«ArrayJoin函数»](array-join.md#functions_arrayjoin)部分。

View File

@ -43,15 +43,15 @@ ClickHouse中提供的其他联接类型:
Also the behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting. Also the behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting.
### ASOF加入使用 {#asof-join-usage} ### ASOF JOIN使用 {#asof-join-usage}
`ASOF JOIN` 当您需要连接没有完全匹配的记录时非常有用。 `ASOF JOIN` 当您需要连接没有完全匹配的记录时非常有用。
算法需要表中的特殊列。 本专栏: 该算法需要表中的特殊列。 该列需要满足:
- 必须包含有序序列。 - 必须包含有序序列。
- 可以是以下类型之一: [Int*UInt*](../../../sql-reference/data-types/int-uint.md), [浮动\*](../../../sql-reference/data-types/float.md), [日期](../../../sql-reference/data-types/date.md), [日期时间](../../../sql-reference/data-types/datetime.md), [十进制\*](../../../sql-reference/data-types/decimal.md). - 可以是以下类型之一: [Int*UInt*](../../../sql-reference/data-types/int-uint.md), [Float\*](../../../sql-reference/data-types/float.md), [Date](../../../sql-reference/data-types/date.md), [DateTime](../../../sql-reference/data-types/datetime.md), [Decimal\*](../../../sql-reference/data-types/decimal.md).
- 不能是唯一的列 `JOIN` - 不能是`JOIN`子句中唯一的列
语法 `ASOF JOIN ... ON`: 语法 `ASOF JOIN ... ON`:
@ -62,9 +62,9 @@ ASOF LEFT JOIN table_2
ON equi_cond AND closest_match_cond ON equi_cond AND closest_match_cond
``` ```
您可以使用任意数量的相等条件和恰好一个最接近的匹配条件。 例如, `SELECT count() FROM table_1 ASOF LEFT JOIN table_2 ON table_1.a == table_2.b AND table_2.t <= table_1.t`. 您可以使用任意数量的相等条件和一个且只有一个最接近的匹配条件。 例如, `SELECT count() FROM table_1 ASOF LEFT JOIN table_2 ON table_1.a == table_2.b AND table_2.t <= table_1.t`.
支持最接近匹配的条件: `>`, `>=`, `<`, `<=`. 支持最接近匹配的运算符: `>`, `>=`, `<`, `<=`.
语法 `ASOF JOIN ... USING`: 语法 `ASOF JOIN ... USING`:
@ -75,9 +75,9 @@ ASOF JOIN table_2
USING (equi_column1, ... equi_columnN, asof_column) USING (equi_column1, ... equi_columnN, asof_column)
``` ```
`ASOF JOIN` 用途 `equi_columnX` 对于加入平等和 `asof_column` 用于加入与最接近的比赛 `table_1.asof_column >= table_2.asof_column` 条件。 该 `asof_column` 列总是在最后一个 `USING` 条款 `table_1.asof_column >= table_2.asof_column` 中, `ASOF JOIN` 使用 `equi_columnX` 来进行条件匹配, `asof_column` 用于JOIN最接近匹配。 `asof_column` 列总是在最后一个 `USING` 条件中。
例如,请考虑下表: 例如,参考下表:
table_1 table_2 table_1 table_2
event | ev_time | user_id event | ev_time | user_id event | ev_time | user_id event | ev_time | user_id
@ -88,10 +88,10 @@ USING (equi_column1, ... equi_columnN, asof_column)
event_1_2 | 13:00 | 42 event_2_3 | 13:00 | 42 event_1_2 | 13:00 | 42 event_2_3 | 13:00 | 42
... ... ... ...
`ASOF JOIN` 可以从用户事件的时间戳 `table_1` 并找到一个事件 `table_2` 其中时间戳最接近事件的时间戳 `table_1` 对应于最接近的匹配条件。 如果可用,则相等的时间戳值是最接近的值。 在这里,该 `user_id` 列可用于连接相等和 `ev_time` 列可用于在最接近的匹配加入。 在我们的例子中, `event_1_1` 可以加入 `event_2_1``event_1_2` 可以加入 `event_2_3`,但是 `event_2_2` 不能加入 `ASOF JOIN`会从 `table_2` 中的用户事件时间戳找出和 `table_1` 中用户事件时间戳中最近的一个时间戳,来满足最接近匹配的条件。如果有得话,则相等的时间戳值是最接近的值。在此例中,`user_id` 列可用于条件匹配,`ev_time` 列可用于最接近匹配。在此例中,`event_1_1` 可以 JOIN `event_2_1``event_1_2` 可以JOIN `event_2_3`,但是 `event_2_2` 不能被JOIN
!!! note "注" !!! note "注"
`ASOF` 加入是 **不** 支持在 [加入我们](../../../engines/table-engines/special/join.md) 表引擎。 `ASOF JOIN`在 [JOIN](../../../engines/table-engines/special/join.md) 表引擎**不受** 支持
## 分布式联接 {#global-join} ## 分布式联接 {#global-join}

View File

@ -2252,30 +2252,27 @@ private:
return; return;
processed_rows += block.rows(); processed_rows += block.rows();
/// Even if all blocks are empty, we still need to initialize the output stream to write empty resultset.
initBlockOutputStream(block); initBlockOutputStream(block);
/// The header block containing zero rows was used to initialize /// The header block containing zero rows was used to initialize
/// block_out_stream, do not output it. /// block_out_stream, do not output it.
/// Also do not output too much data if we're fuzzing. /// Also do not output too much data if we're fuzzing.
if (block.rows() != 0 if (block.rows() == 0 || (query_fuzzer_runs != 0 && processed_rows >= 100))
&& (query_fuzzer_runs == 0 || processed_rows < 100)) return;
{
block_out_stream->write(block);
written_first_block = true;
}
bool clear_progress = false;
if (need_render_progress) if (need_render_progress)
clear_progress = std_out.offset() > 0;
if (clear_progress)
clearProgress(); clearProgress();
block_out_stream->write(block);
written_first_block = true;
/// Received data block is immediately displayed to the user. /// Received data block is immediately displayed to the user.
block_out_stream->flush(); block_out_stream->flush();
/// Restore progress bar after data block. /// Restore progress bar after data block.
if (clear_progress) if (need_render_progress)
writeProgress(); writeProgress();
} }

View File

@ -55,7 +55,7 @@ protected:
virtual Poco::Logger * getLog() const = 0; virtual Poco::Logger * getLog() const = 0;
virtual const Poco::Timespan & getHTTPTimeout() const = 0; virtual Poco::Timespan getHTTPTimeout() const = 0;
virtual Poco::URI createBaseURI() const = 0; virtual Poco::URI createBaseURI() const = 0;

View File

@ -28,7 +28,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(
, log(&Poco::Logger::get("LibraryBridgeHelper")) , log(&Poco::Logger::get("LibraryBridgeHelper"))
, sample_block(sample_block_) , sample_block(sample_block_)
, config(context_->getConfigRef()) , config(context_->getConfigRef())
, http_timeout(context_->getSettingsRef().http_receive_timeout.value.totalSeconds()) , http_timeout(context_->getSettingsRef().http_receive_timeout.value)
, dictionary_id(dictionary_id_) , dictionary_id(dictionary_id_)
{ {
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT); bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);

View File

@ -57,7 +57,7 @@ protected:
Poco::Logger * getLog() const override { return log; } Poco::Logger * getLog() const override { return log; }
const Poco::Timespan & getHTTPTimeout() const override { return http_timeout; } Poco::Timespan getHTTPTimeout() const override { return http_timeout; }
Poco::URI createBaseURI() const override; Poco::URI createBaseURI() const override;

View File

@ -62,9 +62,9 @@ public:
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed"; static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
XDBCBridgeHelper( XDBCBridgeHelper(
ContextPtr global_context_, ContextPtr global_context_,
const Poco::Timespan & http_timeout_, Poco::Timespan http_timeout_,
const std::string & connection_string_) const std::string & connection_string_)
: IXDBCBridgeHelper(global_context_) : IXDBCBridgeHelper(global_context_)
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper")) , log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_) , connection_string(connection_string_)
@ -90,7 +90,7 @@ protected:
String configPrefix() const override { return BridgeHelperMixin::configPrefix(); } String configPrefix() const override { return BridgeHelperMixin::configPrefix(); }
const Poco::Timespan & getHTTPTimeout() const override { return http_timeout; } Poco::Timespan getHTTPTimeout() const override { return http_timeout; }
const Poco::Util::AbstractConfiguration & getConfig() const override { return config; } const Poco::Util::AbstractConfiguration & getConfig() const override { return config; }
@ -118,7 +118,7 @@ private:
Poco::Logger * log; Poco::Logger * log;
std::string connection_string; std::string connection_string;
const Poco::Timespan & http_timeout; Poco::Timespan http_timeout;
std::string bridge_host; std::string bridge_host;
size_t bridge_port; size_t bridge_port;

View File

@ -116,7 +116,7 @@ ConnectionEstablisherAsync::ConnectionEstablisherAsync(
epoll.add(receive_timeout.getDescriptor()); epoll.add(receive_timeout.getDescriptor());
} }
void ConnectionEstablisherAsync::Routine::ReadCallback::operator()(int fd, const Poco::Timespan & timeout, const std::string &) void ConnectionEstablisherAsync::Routine::ReadCallback::operator()(int fd, Poco::Timespan timeout, const std::string &)
{ {
/// Check if it's the first time and we need to add socket fd to epoll. /// Check if it's the first time and we need to add socket fd to epoll.
if (connection_establisher_async.socket_fd == -1) if (connection_establisher_async.socket_fd == -1)

View File

@ -92,7 +92,7 @@ private:
ConnectionEstablisherAsync & connection_establisher_async; ConnectionEstablisherAsync & connection_establisher_async;
Fiber & fiber; Fiber & fiber;
void operator()(int fd, const Poco::Timespan & timeout, const std::string &); void operator()(int fd, Poco::Timespan timeout, const std::string &);
}; };
Fiber operator()(Fiber && sink); Fiber operator()(Fiber && sink);

View File

@ -98,7 +98,7 @@ private:
PacketReceiver & receiver; PacketReceiver & receiver;
Fiber & sink; Fiber & sink;
void operator()(int, const Poco::Timespan & timeout, const std::string &) void operator()(int, Poco::Timespan timeout, const std::string &)
{ {
receiver.receive_timeout.setRelative(timeout); receiver.receive_timeout.setRelative(timeout);
receiver.is_read_in_process = true; receiver.is_read_in_process = true;

View File

@ -9,7 +9,7 @@
namespace DB namespace DB
{ {
using AsyncCallback = std::function<void(int, const Poco::Timespan &, const std::string &)>; using AsyncCallback = std::function<void(int, Poco::Timespan, const std::string &)>;
class Epoll class Epoll
{ {

View File

@ -74,7 +74,7 @@ void TimerDescriptor::drain() const
} }
} }
void TimerDescriptor::setRelative(const Poco::Timespan & timespan) const void TimerDescriptor::setRelative(Poco::Timespan timespan) const
{ {
itimerspec spec; itimerspec spec;
spec.it_interval.tv_nsec = 0; spec.it_interval.tv_nsec = 0;

View File

@ -24,7 +24,7 @@ public:
void reset() const; void reset() const;
void drain() const; void drain() const;
void setRelative(const Poco::Timespan & timespan) const; void setRelative(Poco::Timespan timespan) const;
}; };
} }

View File

@ -101,7 +101,7 @@ struct SettingFieldTimespan
Poco::Timespan value; Poco::Timespan value;
bool changed = false; bool changed = false;
explicit SettingFieldTimespan(const Poco::Timespan & x = {}) : value(x) {} explicit SettingFieldTimespan(Poco::Timespan x = {}) : value(x) {}
template <class Rep, class Period = std::ratio<1>> template <class Rep, class Period = std::ratio<1>>
explicit SettingFieldTimespan(const std::chrono::duration<Rep, Period> & x) explicit SettingFieldTimespan(const std::chrono::duration<Rep, Period> & x)
@ -110,7 +110,7 @@ struct SettingFieldTimespan
explicit SettingFieldTimespan(UInt64 x) : SettingFieldTimespan(Poco::Timespan{static_cast<Poco::Timespan::TimeDiff>(x * microseconds_per_unit)}) {} explicit SettingFieldTimespan(UInt64 x) : SettingFieldTimespan(Poco::Timespan{static_cast<Poco::Timespan::TimeDiff>(x * microseconds_per_unit)}) {}
explicit SettingFieldTimespan(const Field & f); explicit SettingFieldTimespan(const Field & f);
SettingFieldTimespan & operator =(const Poco::Timespan & x) { value = x; changed = true; return *this; } SettingFieldTimespan & operator =(Poco::Timespan x) { value = x; changed = true; return *this; }
template <class Rep, class Period = std::ratio<1>> template <class Rep, class Period = std::ratio<1>>
SettingFieldTimespan & operator =(const std::chrono::duration<Rep, Period> & x) { *this = Poco::Timespan{static_cast<Poco::Timespan::TimeDiff>(std::chrono::duration_cast<std::chrono::microseconds>(x).count())}; return *this; } SettingFieldTimespan & operator =(const std::chrono::duration<Rep, Period> & x) { *this = Poco::Timespan{static_cast<Poco::Timespan::TimeDiff>(std::chrono::duration_cast<std::chrono::microseconds>(x).count())}; return *this; }

View File

@ -19,7 +19,7 @@ struct RemoteQueryExecutorRoutine
RemoteQueryExecutorReadContext & read_context; RemoteQueryExecutorReadContext & read_context;
Fiber & fiber; Fiber & fiber;
void operator()(int fd, const Poco::Timespan & timeout = 0, const std::string fd_description = "") void operator()(int fd, Poco::Timespan timeout = 0, const std::string fd_description = "")
{ {
try try
{ {
@ -89,7 +89,7 @@ RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(IConnections & co
fiber = boost::context::fiber(std::allocator_arg_t(), stack, std::move(routine)); fiber = boost::context::fiber(std::allocator_arg_t(), stack, std::move(routine));
} }
void RemoteQueryExecutorReadContext::setConnectionFD(int fd, const Poco::Timespan & timeout, const std::string & fd_description) void RemoteQueryExecutorReadContext::setConnectionFD(int fd, Poco::Timespan timeout, const std::string & fd_description)
{ {
if (fd == connection_fd) if (fd == connection_fd)
return; return;

View File

@ -58,7 +58,7 @@ public:
bool checkTimeout(bool blocking = false); bool checkTimeout(bool blocking = false);
bool checkTimeoutImpl(bool blocking); bool checkTimeoutImpl(bool blocking);
void setConnectionFD(int fd, const Poco::Timespan & timeout = 0, const std::string & fd_description = ""); void setConnectionFD(int fd, Poco::Timespan timeout = 0, const std::string & fd_description = "");
void setTimer() const; void setTimer() const;
bool resumeRoutine(); bool resumeRoutine();

View File

@ -24,9 +24,9 @@ struct ConnectionTimeouts
ConnectionTimeouts() = default; ConnectionTimeouts() = default;
ConnectionTimeouts(const Poco::Timespan & connection_timeout_, ConnectionTimeouts(Poco::Timespan connection_timeout_,
const Poco::Timespan & send_timeout_, Poco::Timespan send_timeout_,
const Poco::Timespan & receive_timeout_) Poco::Timespan receive_timeout_)
: connection_timeout(connection_timeout_), : connection_timeout(connection_timeout_),
send_timeout(send_timeout_), send_timeout(send_timeout_),
receive_timeout(receive_timeout_), receive_timeout(receive_timeout_),
@ -38,10 +38,10 @@ struct ConnectionTimeouts
{ {
} }
ConnectionTimeouts(const Poco::Timespan & connection_timeout_, ConnectionTimeouts(Poco::Timespan connection_timeout_,
const Poco::Timespan & send_timeout_, Poco::Timespan send_timeout_,
const Poco::Timespan & receive_timeout_, Poco::Timespan receive_timeout_,
const Poco::Timespan & tcp_keep_alive_timeout_) Poco::Timespan tcp_keep_alive_timeout_)
: connection_timeout(connection_timeout_), : connection_timeout(connection_timeout_),
send_timeout(send_timeout_), send_timeout(send_timeout_),
receive_timeout(receive_timeout_), receive_timeout(receive_timeout_),
@ -52,11 +52,11 @@ struct ConnectionTimeouts
receive_data_timeout(receive_timeout_) receive_data_timeout(receive_timeout_)
{ {
} }
ConnectionTimeouts(const Poco::Timespan & connection_timeout_, ConnectionTimeouts(Poco::Timespan connection_timeout_,
const Poco::Timespan & send_timeout_, Poco::Timespan send_timeout_,
const Poco::Timespan & receive_timeout_, Poco::Timespan receive_timeout_,
const Poco::Timespan & tcp_keep_alive_timeout_, Poco::Timespan tcp_keep_alive_timeout_,
const Poco::Timespan & http_keep_alive_timeout_) Poco::Timespan http_keep_alive_timeout_)
: connection_timeout(connection_timeout_), : connection_timeout(connection_timeout_),
send_timeout(send_timeout_), send_timeout(send_timeout_),
receive_timeout(receive_timeout_), receive_timeout(receive_timeout_),
@ -68,14 +68,14 @@ struct ConnectionTimeouts
{ {
} }
ConnectionTimeouts(const Poco::Timespan & connection_timeout_, ConnectionTimeouts(Poco::Timespan connection_timeout_,
const Poco::Timespan & send_timeout_, Poco::Timespan send_timeout_,
const Poco::Timespan & receive_timeout_, Poco::Timespan receive_timeout_,
const Poco::Timespan & tcp_keep_alive_timeout_, Poco::Timespan tcp_keep_alive_timeout_,
const Poco::Timespan & http_keep_alive_timeout_, Poco::Timespan http_keep_alive_timeout_,
const Poco::Timespan & secure_connection_timeout_, Poco::Timespan secure_connection_timeout_,
const Poco::Timespan & receive_hello_timeout_, Poco::Timespan receive_hello_timeout_,
const Poco::Timespan & receive_data_timeout_) Poco::Timespan receive_data_timeout_)
: connection_timeout(connection_timeout_), : connection_timeout(connection_timeout_),
send_timeout(send_timeout_), send_timeout(send_timeout_),
receive_timeout(receive_timeout_), receive_timeout(receive_timeout_),
@ -87,7 +87,7 @@ struct ConnectionTimeouts
{ {
} }
static Poco::Timespan saturate(const Poco::Timespan & timespan, const Poco::Timespan & limit) static Poco::Timespan saturate(Poco::Timespan timespan, Poco::Timespan limit)
{ {
if (limit.totalMicroseconds() == 0) if (limit.totalMicroseconds() == 0)
return timespan; return timespan;
@ -95,7 +95,7 @@ struct ConnectionTimeouts
return (timespan > limit) ? limit : timespan; return (timespan > limit) ? limit : timespan;
} }
ConnectionTimeouts getSaturated(const Poco::Timespan & limit) const ConnectionTimeouts getSaturated(Poco::Timespan limit) const
{ {
return ConnectionTimeouts(saturate(connection_timeout, limit), return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit), saturate(send_timeout, limit),

View File

@ -8,7 +8,7 @@
namespace DB namespace DB
{ {
using AsyncCallback = std::function<void(int, const Poco::Timespan &, const std::string &)>; using AsyncCallback = std::function<void(int, Poco::Timespan, const std::string &)>;
/// Works with the ready Poco::Net::Socket. Blocking operations. /// Works with the ready Poco::Net::Socket. Blocking operations.
class ReadBufferFromPocoSocket : public BufferWithOwnMemory<ReadBuffer> class ReadBufferFromPocoSocket : public BufferWithOwnMemory<ReadBuffer>

View File

@ -7,8 +7,8 @@ namespace DB
{ {
TimeoutSetter::TimeoutSetter(Poco::Net::StreamSocket & socket_, TimeoutSetter::TimeoutSetter(Poco::Net::StreamSocket & socket_,
const Poco::Timespan & send_timeout_, Poco::Timespan send_timeout_,
const Poco::Timespan & receive_timeout_, Poco::Timespan receive_timeout_,
bool limit_max_timeout) bool limit_max_timeout)
: socket(socket_), send_timeout(send_timeout_), receive_timeout(receive_timeout_) : socket(socket_), send_timeout(send_timeout_), receive_timeout(receive_timeout_)
{ {
@ -22,7 +22,7 @@ TimeoutSetter::TimeoutSetter(Poco::Net::StreamSocket & socket_,
socket.setReceiveTimeout(receive_timeout); socket.setReceiveTimeout(receive_timeout);
} }
TimeoutSetter::TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout) TimeoutSetter::TimeoutSetter(Poco::Net::StreamSocket & socket_, Poco::Timespan timeout_, bool limit_max_timeout)
: TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout) : TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout)
{ {
} }

View File

@ -11,11 +11,11 @@ namespace DB
struct TimeoutSetter struct TimeoutSetter
{ {
TimeoutSetter(Poco::Net::StreamSocket & socket_, TimeoutSetter(Poco::Net::StreamSocket & socket_,
const Poco::Timespan & send_timeout_, Poco::Timespan send_timeout_,
const Poco::Timespan & receive_timeout_, Poco::Timespan receive_timeout_,
bool limit_max_timeout = false); bool limit_max_timeout = false);
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout = false); TimeoutSetter(Poco::Net::StreamSocket & socket_, Poco::Timespan timeout_, bool limit_max_timeout = false);
~TimeoutSetter(); ~TimeoutSetter();

View File

@ -541,7 +541,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
} }
Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan & limit) Poco::Timespan Cluster::saturate(Poco::Timespan v, Poco::Timespan limit)
{ {
if (limit.totalMicroseconds() == 0) if (limit.totalMicroseconds() == 0)
return v; return v;

View File

@ -52,7 +52,7 @@ public:
Cluster & operator=(const Cluster &) = delete; Cluster & operator=(const Cluster &) = delete;
/// is used to set a limit on the size of the timeout /// is used to set a limit on the size of the timeout
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); static Poco::Timespan saturate(Poco::Timespan v, Poco::Timespan limit);
public: public:
using SlotToShard = std::vector<UInt64>; using SlotToShard = std::vector<UInt64>;

View File

@ -42,7 +42,7 @@ public:
/// Load configuration from some concrete source to AbstractConfiguration /// Load configuration from some concrete source to AbstractConfiguration
virtual LoadablesConfigurationPtr load(const std::string & path) = 0; virtual LoadablesConfigurationPtr load(const std::string & path) = 0;
virtual ~IExternalLoaderConfigRepository() {} virtual ~IExternalLoaderConfigRepository() = default;
}; };
} }

View File

@ -475,7 +475,7 @@ void TCPHandler::runImpl()
} }
bool TCPHandler::readDataNext(const size_t & poll_interval, const int & receive_timeout) bool TCPHandler::readDataNext(size_t poll_interval, time_t receive_timeout)
{ {
Stopwatch watch(CLOCK_MONOTONIC_COARSE); Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -493,8 +493,8 @@ bool TCPHandler::readDataNext(const size_t & poll_interval, const int & receive_
* If we periodically poll, the receive_timeout of the socket itself does not work. * If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added. * Therefore, an additional check is added.
*/ */
double elapsed = watch.elapsedSeconds(); Float64 elapsed = watch.elapsedSeconds();
if (elapsed > receive_timeout) if (elapsed > static_cast<Float64>(receive_timeout))
{ {
throw Exception(ErrorCodes::SOCKET_TIMEOUT, throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Timeout exceeded while receiving data from client. Waited for {} seconds, timeout is {} seconds.", "Timeout exceeded while receiving data from client. Waited for {} seconds, timeout is {} seconds.",
@ -535,10 +535,7 @@ std::tuple<size_t, int> TCPHandler::getReadTimeouts(const Settings & connection_
void TCPHandler::readData(const Settings & connection_settings) void TCPHandler::readData(const Settings & connection_settings)
{ {
size_t poll_interval; auto [poll_interval, receive_timeout] = getReadTimeouts(connection_settings);
int receive_timeout;
std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
sendLogs(); sendLogs();
while (readDataNext(poll_interval, receive_timeout)) while (readDataNext(poll_interval, receive_timeout))

View File

@ -174,7 +174,7 @@ private:
void receiveIgnoredPartUUIDs(); void receiveIgnoredPartUUIDs();
String receiveReadTaskResponseAssumeLocked(); String receiveReadTaskResponseAssumeLocked();
bool receiveData(bool scalar); bool receiveData(bool scalar);
bool readDataNext(const size_t & poll_interval, const int & receive_timeout); bool readDataNext(size_t poll_interval, time_t receive_timeout);
void readData(const Settings & connection_settings); void readData(const Settings & connection_settings);
void receiveClusterNameAndSalt(); void receiveClusterNameAndSalt();
std::tuple<size_t, int> getReadTimeouts(const Settings & connection_settings); std::tuple<size_t, int> getReadTimeouts(const Settings & connection_settings);

View File

@ -122,11 +122,10 @@ public:
current_path = uri + path; current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method); auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression); read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf)); reader = std::make_shared<InputStreamFromInputFormat>(input_format);
reader->readPrefix(); reader->readPrefix();
} }
@ -156,10 +155,12 @@ public:
reader->readSuffix(); reader->readSuffix();
reader.reset(); reader.reset();
read_buf.reset();
} }
} }
private: private:
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader; BlockInputStreamPtr reader;
SourcesInfoPtr source_info; SourcesInfoPtr source_info;
String uri; String uri;

View File

@ -1542,33 +1542,38 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
} }
DataPartsVector parts; DataPartsVector parts;
bool have_all_parts = true;
for (const String & name : entry.source_parts) for (const String & source_part_name : entry.source_parts)
{ {
DataPartPtr part = getActiveContainingPart(name); DataPartPtr source_part_or_covering = getActiveContainingPart(source_part_name);
if (!part) if (!source_part_or_covering)
{ {
have_all_parts = false; /// We do not have one of source parts locally, try to take some already merged part from someone.
break; LOG_DEBUG(log, "Don't have all parts for merge {}; will try to fetch it instead", entry.new_part_name);
return false;
} }
if (part->name != name)
if (source_part_or_covering->name != source_part_name)
{ {
LOG_WARNING(log, "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.", name, part->name, entry.new_part_name); /// We do not have source part locally, but we have some covering part. Possible options:
have_all_parts = false; /// 1. We already have merged part (source_part_or_covering->name == new_part_name)
break; /// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too)
/// 3. We have two intersecting parts, both cover source_part_name. It's logical error.
/// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else?
constexpr const char * message = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
LOG_WARNING(log, message, source_part_name, source_part_or_covering->name, entry.new_part_name);
if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, format_version)))
throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, source_part_or_covering->name, entry.new_part_name);
return false;
} }
parts.push_back(part);
parts.push_back(source_part_or_covering);
} }
if (!have_all_parts) /// All source parts are found locally, we can execute merge
{
/// If you do not have all the necessary parts, try to take some already merged part from someone. if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
LOG_DEBUG(log, "Don't have all parts for merge {}; will try to fetch it instead", entry.new_part_name);
return false;
}
else if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{ {
/// If entry is old enough, and have enough size, and part are exists in any replica, /// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica. /// then prefer fetching of merged part from replica.

View File

@ -22,7 +22,7 @@ private:
/* A factory method to create bridge helper, that will assist in remote interaction */ /* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context, virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
const Poco::Timespan & http_timeout_, Poco::Timespan http_timeout_,
const std::string & connection_string_) const = 0; const std::string & connection_string_) const = 0;
ColumnsDescription getActualTableStructure(ContextPtr context) const override; ColumnsDescription getActualTableStructure(ContextPtr context) const override;
@ -48,7 +48,7 @@ public:
private: private:
BridgeHelperPtr createBridgeHelper(ContextPtr context, BridgeHelperPtr createBridgeHelper(ContextPtr context,
const Poco::Timespan & http_timeout_, Poco::Timespan http_timeout_,
const std::string & connection_string_) const override const std::string & connection_string_) const override
{ {
return std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(context, http_timeout_, connection_string_); return std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(context, http_timeout_, connection_string_);
@ -68,7 +68,7 @@ public:
private: private:
BridgeHelperPtr createBridgeHelper(ContextPtr context, BridgeHelperPtr createBridgeHelper(ContextPtr context,
const Poco::Timespan & http_timeout_, Poco::Timespan http_timeout_,
const std::string & connection_string_) const override const std::string & connection_string_) const override
{ {
return std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(context, http_timeout_, connection_string_); return std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(context, http_timeout_, connection_string_);

View File

@ -41,14 +41,14 @@
</replica> </replica>
</shard> </shard>
</shard_with_local_replica_internal_replication> </shard_with_local_replica_internal_replication>
<shard_with_low_cardinality> <shard_with_low_cardinality>
<shard> <shard>
<replica> <replica>
<host>shard1</host> <host>shard1</host>
<port>9000</port> <port>9000</port>
</replica> </replica>
</shard> </shard>
<shard> <shard>
<replica> <replica>
<host>shard2</host> <host>shard2</host>
<port>9000</port> <port>9000</port>

View File

@ -1,6 +1,7 @@
import time import time
import pytest import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager from helpers.network import PartitionManager
from helpers.test_tools import TSV from helpers.test_tools import TSV
@ -76,11 +77,19 @@ CREATE TABLE table_function (n UInt8, s String) ENGINE = MergeTree() ORDER BY n'
CREATE TABLE table_function (n UInt8, s String) ENGINE = MergeTree() ORDER BY n''') CREATE TABLE table_function (n UInt8, s String) ENGINE = MergeTree() ORDER BY n''')
node1.query(''' node1.query('''
CREATE TABLE distributed_one_replica (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica_internal_replication', 'default', 'single_replicated') CREATE TABLE distributed_one_replica_internal_replication (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica_internal_replication', 'default', 'single_replicated')
''') ''')
node2.query(''' node2.query('''
CREATE TABLE distributed_one_replica (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica_internal_replication', 'default', 'single_replicated') CREATE TABLE distributed_one_replica_internal_replication (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica_internal_replication', 'default', 'single_replicated')
''')
node1.query('''
CREATE TABLE distributed_one_replica_no_internal_replication (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica', 'default', 'single_replicated')
''')
node2.query('''
CREATE TABLE distributed_one_replica_no_internal_replication (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica', 'default', 'single_replicated')
''') ''')
node2.query(''' node2.query('''
@ -174,14 +183,45 @@ def test_inserts_local(started_cluster):
assert instance.query("SELECT count(*) FROM local").strip() == '1' assert instance.query("SELECT count(*) FROM local").strip() == '1'
def test_inserts_single_replica(started_cluster): def test_inserts_single_replica_local_internal_replication(started_cluster):
with pytest.raises(QueryRuntimeException, match="Table default.single_replicated doesn't exist"):
node1.query(
"INSERT INTO distributed_one_replica_internal_replication VALUES ('2000-01-01', 1)",
settings={
"insert_distributed_sync": "1",
"prefer_localhost_replica": "1",
# to make the test more deterministic
"load_balancing": "first_or_random",
},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == '0'
def test_inserts_single_replica_internal_replication(started_cluster):
node1.query( node1.query(
"INSERT INTO distributed_one_replica VALUES ('2000-01-01', 1)", "INSERT INTO distributed_one_replica_internal_replication VALUES ('2000-01-01', 1)",
settings={"insert_distributed_sync": "1", "prefer_localhost_replica": "0"}, settings={
"insert_distributed_sync": "1",
"prefer_localhost_replica": "0",
# to make the test more deterministic
"load_balancing": "first_or_random",
},
) )
assert node2.query("SELECT count(*) FROM single_replicated").strip() == '1' assert node2.query("SELECT count(*) FROM single_replicated").strip() == '1'
def test_inserts_single_replica_no_internal_replication(started_cluster):
with pytest.raises(QueryRuntimeException, match="Table default.single_replicated doesn't exist"):
node1.query(
"INSERT INTO distributed_one_replica_no_internal_replication VALUES ('2000-01-01', 1)",
settings={
"insert_distributed_sync": "1",
"prefer_localhost_replica": "0",
},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == '1'
def test_prefer_localhost_replica(started_cluster): def test_prefer_localhost_replica(started_cluster):
test_query = "SELECT * FROM distributed ORDER BY id" test_query = "SELECT * FROM distributed ORDER BY id"

View File

@ -30,7 +30,6 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location') client1.send('CREATE LIVE VIEW test.lv AS SELECT toStartOfDay(time) AS day, location, avg(temperature) FROM test.mt GROUP BY day, location ORDER BY day, location')
client1.expect(prompt) client1.expect(prompt)
client1.send('WATCH test.lv FORMAT CSVWithNames') client1.send('WATCH test.lv FORMAT CSVWithNames')
client1.expect(r'_version')
client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)") client2.send("INSERT INTO test.mt VALUES ('2019-01-01 00:00:00','New York',60),('2019-01-01 00:10:00','New York',70)")
client2.expect(prompt) client2.expect(prompt)
client1.expect(r'"2019-01-01 00:00:00","New York",65') client1.expect(r'"2019-01-01 00:00:00","New York",65')
@ -60,7 +59,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
match = client1.expect('(%s)|([#\$] )' % prompt) match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]: if match.groups()[1]:
client1.send(client1.command) client1.send(client1.command)
client1.expect(prompt) client1.expect(prompt)
client1.send('DROP TABLE test.lv') client1.send('DROP TABLE test.lv')
client1.expect(prompt) client1.expect(prompt)
client1.send('DROP TABLE test.mt') client1.send('DROP TABLE test.mt')

View File

@ -1,20 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists huge_strings"
$CLICKHOUSE_CLIENT -q "create table huge_strings (n UInt64, l UInt64, s String, h UInt64) engine=MergeTree order by n"
for _ in {1..10}; do
$CLICKHOUSE_CLIENT -q "select number, (rand() % 100*1000*1000) as l, repeat(randomString(l/1000/1000), 1000*1000) as s, cityHash64(s) from numbers(10) format Values" | $CLICKHOUSE_CLIENT -q "insert into huge_strings values" &
$CLICKHOUSE_CLIENT -q "select number % 10, (rand() % 100) as l, randomString(l) as s, cityHash64(s) from numbers(100000)" | $CLICKHOUSE_CLIENT -q "insert into huge_strings format TSV" &
done;
wait
$CLICKHOUSE_CLIENT -q "select count() from huge_strings"
$CLICKHOUSE_CLIENT -q "select sum(l = length(s)) from huge_strings"
$CLICKHOUSE_CLIENT -q "select sum(h = cityHash64(s)) from huge_strings"
$CLICKHOUSE_CLIENT -q "drop table huge_strings"

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists huge_strings"
$CLICKHOUSE_CLIENT -q "create table huge_strings (n UInt64, l UInt64, s String, h UInt64) engine=MergeTree order by n"
# Timeouts are increased, because test can be slow with sanitizers and parallel runs.
for _ in {1..10}; do
$CLICKHOUSE_CLIENT --receive_timeout 100 --send_timeout 100 --connect_timeout 100 --query "select number, (rand() % 10*1000*1000) as l, repeat(randomString(l/1000/1000), 1000*1000) as s, cityHash64(s) from numbers(10) format Values" | $CLICKHOUSE_CLIENT --receive_timeout 100 --send_timeout 100 --connect_timeout 100 --query "insert into huge_strings values" &
$CLICKHOUSE_CLIENT --receive_timeout 100 --send_timeout 100 --connect_timeout 100 --query "select number % 10, (rand() % 10) as l, randomString(l) as s, cityHash64(s) from numbers(100000)" | $CLICKHOUSE_CLIENT --receive_timeout 100 --send_timeout 100 --connect_timeout 100 --query "insert into huge_strings format TSV" &
done;
wait
$CLICKHOUSE_CLIENT -q "select count() from huge_strings"
$CLICKHOUSE_CLIENT -q "select sum(l = length(s)) from huge_strings"
$CLICKHOUSE_CLIENT -q "select sum(h = cityHash64(s)) from huge_strings"
$CLICKHOUSE_CLIENT -q "drop table huge_strings"

View File

@ -25,6 +25,8 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
client1.send('DROP TABLE IF EXISTS test.lv') client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt) client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv_sums')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt') client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt) client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.sums') client1.send('DROP TABLE IF EXISTS test.sums')
@ -39,12 +41,10 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
client3.expect(prompt) client3.expect(prompt)
client3.send("WATCH test.lv_sums FORMAT CSVWithNames") client3.send("WATCH test.lv_sums FORMAT CSVWithNames")
client3.expect('_version')
client1.send('INSERT INTO test.sums WATCH test.lv') client1.send('INSERT INTO test.sums WATCH test.lv')
client1.expect(r'INSERT INTO') client1.expect(r'INSERT INTO')
client1.expect(r'Progress')
client3.expect('0,1.*\r\n') client3.expect('0,1.*\r\n')
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)') client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
@ -67,7 +67,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
match = client1.expect('(%s)|([#\$] )' % prompt) match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]: if match.groups()[1]:
client1.send(client1.command) client1.send(client1.command)
client1.expect(prompt) client1.expect(prompt)
client2.send('DROP TABLE test.lv') client2.send('DROP TABLE test.lv')
client2.expect(prompt) client2.expect(prompt)