Merge branch 'master' into fix-header-for-union-stream-in-distributed-2

This commit is contained in:
Nikolai Kochetov 2018-04-13 23:11:04 +03:00
commit 41229b201b
146 changed files with 1727 additions and 1229 deletions

View File

@ -1,4 +1,4 @@
# ClickHouse 1.1.54370 Release Candidate, 2018-03-16
# ClickHouse release 1.1.54370, 2018-03-16
## New features:

View File

@ -1,4 +1,4 @@
# ClickHouse 1.1.54370 Release Candidate, 2018-03-16
# ClickHouse release 1.1.54370, 2018-03-16
## Новые возможности:

View File

@ -252,6 +252,7 @@ if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/contrib/poco/cmake/FindODBC.cmake)
else ()
include (cmake/find_odbc.cmake)
endif ()
message (STATUS "Using odbc: ${ODBC_INCLUDE_DIRECTORIES} : ${ODBC_LIBRARIES}")
include (cmake/find_poco.cmake)
include (cmake/find_lz4.cmake)
include (cmake/find_sparsehash.cmake)

View File

@ -51,6 +51,7 @@
set(Poco_HINTS
/usr/local
/usr/local/include/Poco
C:/AppliedInformatics
${Poco_DIR}
$ENV{Poco_DIR}
@ -230,5 +231,3 @@ if(${Poco_OSP_FOUND})
endif()
message(STATUS "Found Poco: ${Poco_LIBRARIES}")

View File

@ -52,24 +52,33 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (Poco_MongoDB_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/poco/MongoDB/include/")
endif ()
if (ODBC_FOUND)
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/ODBC/include/")
set (Poco_SQL_FOUND 1)
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/ODBC/include/")
set (Poco_SQL_FOUND 1)
set (Poco_SQL_LIBRARY PocoSQL)
set (Poco_SQL_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/include"
"${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include"
)
if (ODBC_FOUND)
set (Poco_SQLODBC_FOUND 1)
set (Poco_SQL_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/include"
"${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include"
)
set (Poco_SQLODBC_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/ODBC/include/"
"${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/"
${ODBC_INCLUDE_DIRECTORIES}
)
set (Poco_SQL_LIBRARY PocoSQL)
set (Poco_SQLODBC_LIBRARY PocoSQLODBC ${ODBC_LIBRARIES} ${LTDL_LIBRARY})
else ()
endif ()
else ()
set (Poco_Data_FOUND 1)
set (Poco_Data_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include")
set (Poco_Data_LIBRARY PocoData)
if (ODBC_FOUND)
set (Poco_DataODBC_FOUND 1)
set (Poco_DataODBC_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/" "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include")
set (Poco_Data_LIBRARY PocoData)
set (Poco_DataODBC_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/"
${ODBC_INCLUDE_DIRECTORIES}
)
set (Poco_DataODBC_LIBRARY PocoDataODBC ${ODBC_LIBRARIES} ${LTDL_LIBRARY})
endif ()
endif ()
@ -110,3 +119,5 @@ message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${
# ClickHouse-Extras/clickhouse_warning
# ClickHouse-Extras/clickhouse-purge-logs-on-no-space
# ClickHouse-Extras/clickhouse_freebsd
# ClickHouse-Extras/clikhouse_no_zlib
# ClickHouse-Extras/clickhouse-fix-atomic

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit a107b0c9cee109fe0abfbf509df3c78a1e0c05fa
Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5

View File

@ -145,8 +145,6 @@ target_link_libraries (dbms
clickhouse_common_config
clickhouse_common_io
${MYSQLXX_LIBRARY}
${FARMHASH_LIBRARIES}
${METROHASH_LIBRARIES}
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
@ -163,15 +161,24 @@ endif ()
if (Poco_SQLODBC_FOUND)
target_link_libraries (clickhouse_common_io ${Poco_SQL_LIBRARY})
target_include_directories (clickhouse_common_io PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQL_INCLUDE_DIRS})
target_link_libraries (dbms ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS})
if (NOT USE_INTERNAL_POCO_LIBRARY)
target_include_directories (clickhouse_common_io PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQL_INCLUDE_DIRS})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS})
endif()
endif()
if (Poco_Data_FOUND AND NOT USE_INTERNAL_POCO_LIBRARY)
target_include_directories (clickhouse_common_io PRIVATE ${Poco_Data_INCLUDE_DIRS})
target_include_directories (dbms PRIVATE ${Poco_Data_INCLUDE_DIRS})
endif()
if (Poco_DataODBC_FOUND)
target_link_libraries (clickhouse_common_io ${Poco_Data_LIBRARY})
target_link_libraries (dbms ${Poco_DataODBC_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_DataODBC_INCLUDE_DIRS})
if (NOT USE_INTERNAL_POCO_LIBRARY)
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_DataODBC_INCLUDE_DIRS})
endif()
endif()

View File

@ -1,6 +1,7 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54373-testing)
set(VERSION_REVISION 54373)
set(VERSION_DESCRIBE v1.1.54378-testing)
set(VERSION_REVISION 54378)
set(VERSION_GITHASH 5b19d89133a5ff7c72e40cc8c0226cb00466ba10)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -234,7 +234,7 @@ bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping");
TimeoutSetter timeout_setter(*socket, sync_request_timeout);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
try
{
UInt64 pong = 0;
@ -274,7 +274,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req
if (!connected)
connect();
TimeoutSetter timeout_setter(*socket, sync_request_timeout);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
writeVarUInt(Protocol::Client::TablesStatusRequest, *out);
request.write(*out, server_revision);

View File

@ -8,24 +8,25 @@ namespace DB
{
/// Temporarily overrides socket send/recieve timeouts and reset them back into destructor
/// Timeouts could be only decreased
/// If "limit_max_timeout" is true, timeouts could be only decreased (maxed by previous value).
struct TimeoutSetter
{
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_)
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_,
bool limit_max_timeout = false)
: socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > send_timeout)
if (!limit_max_timeout || old_send_timeout > send_timeout)
socket.setSendTimeout(send_timeout);
if (old_receive_timeout > recieve_timeout)
if (!limit_max_timeout || old_receive_timeout > recieve_timeout)
socket.setReceiveTimeout(recieve_timeout);
}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
: TimeoutSetter(socket_, timeout_, timeout_) {}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout = false)
: TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout) {}
~TimeoutSetter()
{

View File

@ -11,7 +11,7 @@
#include <IO/ReadBufferFromString.h>
#include <Common/Exception.h>
#include <Common/demangle.h>
#include <common/demangle.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Core/Field.h>
#include <Core/AccurateComparison.h>
#include <common/DateLUT.h>
#include <Common/demangle.h>
#include <common/demangle.h>
class SipHash;

View File

@ -7,7 +7,7 @@
#include <sstream>
#include <Common/StackTrace.h>
#include <Common/demangle.h>
#include <common/demangle.h>
StackTrace::StackTrace()

View File

@ -1,6 +1,6 @@
#pragma once
#include <Common/demangle.h>
#include <common/demangle.h>
#include <Common/TypeList.h>
#include <Common/Exception.h>

View File

@ -33,8 +33,7 @@ public:
*
* identifier - if not empty, must uniquely (within same path) identify participant of leader election.
* It means that different participants of leader election have different identifiers
* and existence of more than one ephemeral node with same identifier indicates an error
* (see cleanOldEphemeralNodes).
* and existence of more than one ephemeral node with same identifier indicates an error.
*/
LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
: path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
@ -42,10 +41,15 @@ public:
createNode();
}
void yield()
void shutdown()
{
releaseNode();
createNode();
if (shutdown_called)
return;
shutdown_called = true;
event->set();
if (thread.joinable())
thread.join();
}
~LeaderElection()
@ -63,14 +67,14 @@ private:
std::string node_name;
std::thread thread;
std::atomic<bool> shutdown {false};
std::atomic<bool> shutdown_called {false};
zkutil::EventPtr event = std::make_shared<Poco::Event>();
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
void createNode()
{
shutdown = false;
shutdown_called = false;
node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier);
std::string node_path = node->getPath();
@ -81,16 +85,13 @@ private:
void releaseNode()
{
shutdown = true;
event->set();
if (thread.joinable())
thread.join();
shutdown();
node = nullptr;
}
void threadFunction()
{
while (!shutdown)
while (!shutdown_called)
{
bool success = false;

View File

@ -12,7 +12,7 @@
#include <Common/randomSeed.h>
#define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000
#define ZOOKEEPER_OPERATION_TIMEOUT_MS 1000
#define ZOOKEEPER_OPERATION_TIMEOUT_MS 10000
namespace DB
@ -49,6 +49,9 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
session_timeout_ms = session_timeout_ms_;
chroot = chroot_;
if (hosts.empty())
throw KeeperException("No addresses passed to ZooKeeper constructor.", ZooKeeperImpl::ZooKeeper::ZBADARGUMENTS);
std::vector<std::string> addresses_strings;
boost::split(addresses_strings, hosts, boost::is_any_of(","));
ZooKeeperImpl::ZooKeeper::Addresses addresses;
@ -320,7 +323,7 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat, const EventPtr & w
return existsWatch(path, stat, callbackForEvent(watch));
}
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback)
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback)
{
int32_t code = existsImpl(path, stat, watch_callback);
@ -369,7 +372,7 @@ bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat,
return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
}
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * return_code)
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * return_code)
{
int32_t code = getImpl(path, res, stat, watch_callback);
@ -527,39 +530,53 @@ void ZooKeeper::tryRemoveRecursive(const std::string & path)
}
void ZooKeeper::waitForDisappear(const std::string & path)
namespace
{
while (true)
struct WaitForDisappearState
{
int32_t code = 0;
int32_t event_type = 0;
Poco::Event event;
};
using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>;
}
auto callback = [&](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response)
void ZooKeeper::waitForDisappear(const std::string & path)
{
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
while (true)
{
auto callback = [state](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response)
{
code = response.error;
if (code)
event.set();
state->code = response.error;
if (state->code)
state->event.set();
};
auto watch = [&](const ZooKeeperImpl::ZooKeeper::WatchResponse & response)
auto watch = [state](const ZooKeeperImpl::ZooKeeper::WatchResponse & response)
{
code = response.error;
if (!code)
event_type = response.type;
event.set();
if (!state->code)
{
state->code = response.error;
if (!state->code)
state->event_type = response.type;
state->event.set();
}
};
/// NOTE: if the node doesn't exist, the watch will leak.
impl->exists(path, callback, watch);
event.wait();
state->event.wait();
if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
if (state->code == ZooKeeperImpl::ZooKeeper::ZNONODE)
return;
if (code)
throw KeeperException(code, path);
if (state->code)
throw KeeperException(state->code, path);
if (event_type == ZooKeeperImpl::ZooKeeper::DELETED)
if (state->event_type == ZooKeeperImpl::ZooKeeper::DELETED)
return;
}
}

View File

@ -110,7 +110,7 @@ public:
int32_t tryRemove(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
@ -118,7 +118,7 @@ public:
/// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * code = nullptr);
void set(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);

View File

@ -1,6 +1,7 @@
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
@ -505,7 +506,7 @@ ZooKeeper::ZooKeeper(
Poco::Timespan operation_timeout)
: root_path(root_path_),
session_timeout(session_timeout),
operation_timeout(operation_timeout)
operation_timeout(std::min(operation_timeout, session_timeout))
{
if (!root_path.empty())
{
@ -532,9 +533,6 @@ ZooKeeper::ZooKeeper(
connect(addresses, connection_timeout);
sendHandshake();
receiveHandshake();
if (!auth_scheme.empty())
sendAuth(auth_scheme, auth_data);
@ -549,6 +547,9 @@ void ZooKeeper::connect(
const Addresses & addresses,
Poco::Timespan connection_timeout)
{
if (addresses.empty())
throw Exception("No addresses passed to ZooKeeperImpl constructor", ZBADARGUMENTS);
static constexpr size_t num_tries = 3;
bool connected = false;
@ -559,13 +560,25 @@ void ZooKeeper::connect(
{
try
{
socket = Poco::Net::StreamSocket(); /// Reset the state of previous attempt.
socket.connect(address, connection_timeout);
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
socket.setNoDelay(true);
in.emplace(socket);
out.emplace(socket);
sendHandshake();
receiveHandshake();
connected = true;
break;
}
catch (const Poco::Net::NetException & e)
{
fail_reasons << "\n" << getCurrentExceptionMessage(false);
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << address.toString();
}
catch (const Poco::TimeoutException & e)
{
@ -591,16 +604,9 @@ void ZooKeeper::connect(
out << address.toString();
}
out << fail_reasons.str();
out << fail_reasons.str() << "\n";
throw Exception(out.str(), ZCONNECTIONLOSS);
}
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
socket.setNoDelay(true);
in.emplace(socket);
out.emplace(socket);
}
@ -685,6 +691,8 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
void ZooKeeper::sendThread()
{
setThreadName("ZooKeeperSend");
auto prev_heartbeat_time = clock::now();
try
@ -703,12 +711,33 @@ void ZooKeeper::sendThread()
std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count(),
operation_timeout.totalMilliseconds());
RequestPtr request;
if (requests.tryPop(request, max_wait))
RequestInfo info;
if (requests_queue.tryPop(info, max_wait))
{
request->write(*out);
/// After we popped element from the queue, we must register callbacks (even in the case when expired == true right now),
/// because they must not be lost (callbacks must be called because the user will wait for them).
if (request->xid == close_xid)
if (info.request->xid != close_xid)
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (expired)
break;
info.request->write(*out);
if (info.request->xid == close_xid)
break;
}
}
@ -730,16 +759,13 @@ void ZooKeeper::sendThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(true, false);
}
/// Drain queue
RequestPtr request;
while (requests.tryPop(request))
;
}
void ZooKeeper::receiveThread()
{
setThreadName("ZooKeeperRecv");
try
{
Int64 waited = 0;
@ -960,6 +986,8 @@ void ZooKeeper::receiveEvent()
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), ZMARSHALLINGERROR);
/// NOTE: Exception in callback will propagate to receiveThread and will lead to session expiration. This is Ok.
if (request_info.callback)
request_info.callback(*response);
}
@ -967,63 +995,141 @@ void ZooKeeper::receiveEvent()
void ZooKeeper::finalize(bool error_send, bool error_receive)
{
bool expired_prev = false;
if (expired.compare_exchange_strong(expired_prev, true))
std::unique_lock lock(finalize_mutex, std::defer_lock);
if (!lock.try_lock())
return;
if (expired)
return;
expired = true;
active_session_metric_increment.destroy();
try
{
if (!error_send)
{
/// Send close event. This also signals sending thread to wakeup and then stop.
try
{
close();
}
catch (...)
{
/// This happens for example, when "Cannot push request to queue within operation timeout".
tryLogCurrentException(__PRETTY_FUNCTION__);
}
send_thread.join();
}
try
{
if (!error_send)
{
/// Send close event. This also signals sending thread to wakeup and then stop.
close();
send_thread.join();
}
/// This will also wakeup receiving thread.
/// This will also wakeup the receiving thread.
socket.shutdown();
if (!error_receive)
receive_thread.join();
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZCONNECTIONLOSS;
if (request_info.callback)
request_info.callback(*response);
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZCONNECTIONLOSS;
for (auto & callback : path_watches.second)
if (callback)
callback(response);
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
}
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!error_receive)
receive_thread.join();
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
{
try
{
request_info.callback(*response);
}
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
{
if (callback)
{
try
{
callback(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
/// Drain queue
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
try
{
info.callback(*response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
try
{
info.watch(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -1227,39 +1333,38 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
void ZooKeeper::pushRequest(RequestInfo && info)
{
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired && info.request->xid != close_xid)
throw Exception("Session expired", ZSESSIONEXPIRED);
info.request->addRootPath(root_path);
info.time = clock::now();
if (!info.request->xid)
try
{
info.request->xid = xid.fetch_add(1);
if (info.request->xid < 0)
throw Exception("XID overflow", ZSESSIONEXPIRED);
info.request->addRootPath(root_path);
info.time = clock::now();
if (!info.request->xid)
{
info.request->xid = xid.fetch_add(1);
if (info.request->xid < 0)
throw Exception("XID overflow", ZSESSIONEXPIRED);
}
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(finalize_mutex);
if (expired)
throw Exception("Session expired", ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
}
catch (...)
{
finalize(false, false);
throw;
}
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
}
@ -1426,7 +1531,9 @@ void ZooKeeper::close()
RequestInfo request_info;
request_info.request = std::make_shared<CloseRequest>(std::move(request));
pushRequest(std::move(request_info));
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push close request to queue within operation timeout", ZOPERATIONTIMEOUT);
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
}

View File

@ -24,6 +24,54 @@
#include <functional>
/** ZooKeeper C++ library, a replacement for libzookeeper.
*
* Motivation.
*
* libzookeeper has many bugs:
* - segfaults: for example, if zookeeper connection was interrupted while reading result of multi response;
* - memory corruption: for example, as a result of double free inside libzookeeper;
* - no timeouts for synchronous operations: they may stuck forever under simple Jepsen-like tests;
* - logical errors: for example, chroot prefix is not removed from the results of multi responses.
* - data races;
*
* The code of libzookeeper is over complicated:
* - memory ownership is unclear and bugs are very difficult to track and fix.
* - extremely creepy code for implementation of "chroot" feature.
*
* As of 2018, there are no active maintainers of libzookeeper:
* - bugs in JIRA are fixed only occasionaly with ad-hoc patches by library users.
*
* libzookeeper is a classical example of bad code written in C.
*
* In Go, Python and Rust programming languages,
* there are separate libraries for ZooKeeper, not based on libzookeeper.
* Motivation is almost the same. Example:
* https://github.com/python-zk/kazoo/blob/master/docs/implementation.rst
*
* About "session restore" feature.
*
* libzookeeper has the feature of session restore. Client receives session id and session token from the server,
* and when connection is lost, it can quickly reconnect to any server with the same session id and token,
* to continue with existing session.
* libzookeeper performs this reconnection automatically.
*
* This feature is proven to be harmful.
* For example, it makes very difficult to correctly remove ephemeral nodes.
* This may lead to weird bugs in application code.
* For example, our developers have found that type of bugs in Curator Java library.
*
* On the other side, session restore feature has no advantages,
* because every application should be able to establish new session and reinitialize internal state,
* when the session is lost and cannot be restored.
*
* This library never restores the session. In case of any error, the session is considered as expired
* and you should create a new instance of ZooKeeper object and reinitialize the application state.
*
* This library is not intended to be CPU efficient. Hundreds of thousands operations per second is usually enough.
*/
namespace CurrentMetrics
{
extern const Metric ZooKeeperSession;
@ -61,7 +109,7 @@ public:
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
* for example, just signal a condvar / fulfull a promise.
* - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
* - whenever you receive SessionExpired exception of method isValid returns false,
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
* - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event.
* - data for callbacks must be alive when ZooKeeper instance is alive.
@ -391,6 +439,15 @@ public:
using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks
/// (don't destroy callback data before it will be called).
///
/// All callbacks are executed sequentially (the execution of callbacks is serialized).
///
/// If an exception is thrown inside the callback, the session will expire,
/// and all other callbacks will be called with "Session expired" error.
void create(
const String & path,
@ -513,7 +570,10 @@ private:
std::optional<WriteBufferFromPocoSocket> out;
int64_t session_id = 0;
std::atomic<XID> xid {1};
std::atomic<bool> expired {false};
std::mutex finalize_mutex;
using clock = std::chrono::steady_clock;
@ -525,9 +585,9 @@ private:
clock::time_point time;
};
using RequestsQueue = ConcurrentBoundedQueue<RequestPtr>;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests{1};
RequestsQueue requests_queue{1};
void pushRequest(RequestInfo && request);
using Operations = std::map<XID, RequestInfo>;
@ -544,8 +604,6 @@ private:
std::thread send_thread;
std::thread receive_thread;
std::atomic<bool> expired {false};
void connect(
const Addresses & addresses,
Poco::Timespan connection_timeout);
@ -571,7 +629,7 @@ private:
template <typename T>
void read(T &);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperSession};
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
};
};

View File

@ -6,6 +6,7 @@ const char * auto_config_build[]
{
"VERSION_FULL", "@VERSION_FULL@",
"VERSION_DESCRIBE", "@VERSION_DESCRIBE@",
"VERSION_GITHASH", "@VERSION_GITHASH@",
"BUILD_DATE", "@BUILD_DATE@",
"BUILD_TYPE", "@CMAKE_BUILD_TYPE@",
"SYSTEM", "@CMAKE_SYSTEM@",

View File

@ -16,3 +16,4 @@
#cmakedefine VERSION_STRING "@VERSION_STRING@"
#cmakedefine VERSION_FULL "@VERSION_FULL@"
#cmakedefine VERSION_DESCRIBE "@VERSION_DESCRIBE@"
#cmakedefine VERSION_GITHASH "@VERSION_GITHASH@"

View File

@ -6,7 +6,7 @@
#include <string>
#include <Common/Exception.h>
#include <Common/demangle.h>
#include <common/demangle.h>
namespace DB

View File

@ -5,6 +5,7 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Field.h>
#include <Core/NamesAndTypes.h>
#include <Common/FieldVisitors.h>
#include <Common/COWPtr.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/IDataType.h>
@ -13,19 +14,19 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Parsers/IAST.h>
namespace DB
{
std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what)
{
stream << "IBlockInputStream(name = " << what.getName() << ")";
//what.dumpTree(stream); // todo: set const
return stream;
}
std::ostream & operator<<(std::ostream & stream, const Field & what)
{
stream << "Field(type = " << what.getTypeName() << ")";
stream << applyVisitor(FieldVisitorDump(), what);
return stream;
}
@ -46,7 +47,6 @@ std::ostream & operator<<(std::ostream & stream, const IStorage & what)
stream << "IStorage(name = " << what.getName() << ", tableName = " << what.getTableName() << ") {"
<< what.getColumns().getAllPhysical().toString()
<< "}";
// isRemote supportsSampling supportsFinal supportsPrewhere
return stream;
}
@ -71,27 +71,10 @@ std::ostream & operator<<(std::ostream & stream, const Block & what)
return stream;
}
template <typename T>
std::ostream & printCOWPtr(std::ostream & stream, const typename COWPtr<T>::Ptr & what)
{
stream << "COWPtr::Ptr(" << what.get();
if (what)
stream << ", use_count = " << what->use_count();
stream << ") {";
if (what)
stream << *what;
else
stream << "nullptr";
stream << "}";
return stream;
}
std::ostream & operator<<(std::ostream & stream, const ColumnWithTypeAndName & what)
{
stream << "ColumnWithTypeAndName(name = " << what.name << ", type = " << what.type << ", column = ";
return printCOWPtr<IColumn>(stream, what.column) << ")";
return dumpValue(stream, what.column) << ")";
}
std::ostream & operator<<(std::ostream & stream, const IColumn & what)
@ -112,15 +95,6 @@ std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what
return stream;
}
std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what)
{
stream << "SubqueryForSet(source = " << what.source
// TODO: << ", set = " << what.set << ", join = " << what.join
<< ", table = " << what.table
<< ")";
return stream;
}
std::ostream & operator<<(std::ostream & stream, const IAST & what)
{
stream << "IAST{";
@ -129,14 +103,4 @@ std::ostream & operator<<(std::ostream & stream, const IAST & what)
return stream;
}
std::ostream & operator<<(std::ostream & stream, const ExpressionAnalyzer & what)
{
stream << "ExpressionAnalyzer{"
<< "hasAggregation=" << what.hasAggregation()
<< ", SubqueriesForSet=" << what.getSubqueriesForSets()
<< ", ExternalTables=" << what.getExternalTables()
<< "}";
return stream;
}
}

View File

@ -2,7 +2,7 @@
#include <iostream>
#include <Client/Connection.h>
#include <Common/PODArray.h>
namespace DB
{
@ -37,26 +37,11 @@ std::ostream & operator<<(std::ostream & stream, const ColumnWithTypeAndName & w
class IColumn;
std::ostream & operator<<(std::ostream & stream, const IColumn & what);
struct SubqueryForSet;
std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what);
class IAST;
std::ostream & operator<<(std::ostream & stream, const IAST & what);
class ExpressionAnalyzer;
std::ostream & operator<<(std::ostream & stream, const ExpressionAnalyzer & what);
std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what);
template <typename T, size_t INITIAL_SIZE, typename TAllocator, size_t pad_right_>
std::ostream & operator<<(std::ostream & stream, const PODArray<T, INITIAL_SIZE, TAllocator, pad_right_> & what)
{
stream << "PODArray(size = " << what.size() << ", capacity = " << what.capacity() << ")";
dumpContainer(stream, what);
return stream;
};
}
/// some operator<< should be declared before operator<<(... std::shared_ptr<>)

View File

@ -12,18 +12,46 @@ namespace ErrorCodes
}
AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// We leave only states of aggregate functions.
if (!startsWith(column.type->getName(), "AggregateFunction"))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Included into PK?
SortDescription::const_iterator it = description.begin();
for (; it != description.end(); ++it)
if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i))
break;
if (it != description.end())
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
column_numbers_to_aggregate.push_back(i);
}
}
Block AggregatingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -31,37 +59,6 @@ Block AggregatingSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return Block();
/// Additional initialization.
if (next_key.empty())
{
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// We leave only states of aggregate functions.
if (!startsWith(column.type->getName(), "AggregateFunction"))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Included into PK?
SortDescription::const_iterator it = description.begin();
for (; it != description.end(); ++it)
if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i))
break;
if (it != description.end())
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
column_numbers_to_aggregate.push_back(i);
}
}
columns_to_aggregate.resize(column_numbers_to_aggregate.size());
for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i)
columns_to_aggregate[i] = typeid_cast<ColumnAggregateFunction *>(merged_columns[column_numbers_to_aggregate[i]].get());

View File

@ -21,10 +21,8 @@ namespace DB
class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
AggregatingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{
}
AggregatingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_);
String getName() const override { return "AggregatingSorted"; }

View File

@ -108,13 +108,8 @@ Block CollapsingSortedBlockInputStream::readImpl()
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -122,11 +117,6 @@ Block CollapsingSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return {};
/// Additional initialization.
if (first_negative.empty())
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}

View File

@ -25,10 +25,10 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
public:
CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
{
sign_column_number = header.getPositionByName(sign_column);
}
String getName() const override { return "CollapsingSorted"; }
@ -38,8 +38,7 @@ protected:
Block readImpl() override;
private:
String sign_column;
size_t sign_column_number = 0;
size_t sign_column_number;
Logger * log = &Logger::get("CollapsingSortedBlockInputStream");

View File

@ -12,6 +12,31 @@ namespace ErrorCodes
}
GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
const Graphite::Params & params, time_t time_of_merge)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
params(params), time_of_merge(time_of_merge)
{
size_t max_size_of_aggregate_state = 0;
for (const auto & pattern : params.patterns)
if (pattern.function->sizeOfData() > max_size_of_aggregate_state)
max_size_of_aggregate_state = pattern.function->sizeOfData();
place_for_aggregate_state.resize(max_size_of_aggregate_state);
/// Memoize column numbers in block.
path_column_num = header.getPositionByName(params.path_column_name);
time_column_num = header.getPositionByName(params.time_column_name);
value_column_num = header.getPositionByName(params.value_column_name);
version_column_num = header.getPositionByName(params.version_column_name);
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
}
const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
{
for (const auto & pattern : params.patterns)
@ -68,10 +93,8 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
if (finished)
return Block();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -79,27 +102,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return Block();
/// Additional initialization.
if (is_first)
{
size_t max_size_of_aggregate_state = 0;
for (const auto & pattern : params.patterns)
if (pattern.function->sizeOfData() > max_size_of_aggregate_state)
max_size_of_aggregate_state = pattern.function->sizeOfData();
place_for_aggregate_state.resize(max_size_of_aggregate_state);
/// Memoize column numbers in block.
path_column_num = header.getPositionByName(params.path_column_name);
time_column_num = header.getPositionByName(params.time_column_name);
value_column_num = header.getPositionByName(params.value_column_name);
version_column_num = header.getPositionByName(params.version_column_name);
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
}
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}

View File

@ -126,12 +126,8 @@ class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStrea
{
public:
GraphiteRollupSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_,
const Graphite::Params & params, time_t time_of_merge)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
params(params), time_of_merge(time_of_merge)
{
}
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
const Graphite::Params & params, time_t time_of_merge);
String getName() const override { return "GraphiteRollupSorted"; }

View File

@ -238,7 +238,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel(false);
cancel(/* kill */ true);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
@ -339,6 +339,21 @@ void IProfilingBlockInputStream::cancel(bool kill)
}
bool IProfilingBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IProfilingBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;

View File

@ -119,21 +119,8 @@ public:
*/
virtual void cancel(bool kill);
/** Do you want to abort the receipt of data.
*/
bool isCancelled() const
{
return is_cancelled.load(std::memory_order_seq_cst);
}
bool isCancelledOrThrowIfKilled() const
{
if (!isCancelled())
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
@ -189,7 +176,7 @@ public:
protected:
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
bool is_killed{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
ProcessListElement * process_list_elem = nullptr;

View File

@ -63,14 +63,23 @@ static void enrichBlockWithConstants(Block & block, const Block & header)
}
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
children.push_back(input);
header = children.at(0)->getHeader();
header_without_constants = header;
removeConstantsFromBlock(header_without_constants);
removeConstantsFromSortDescription(header, description);
}
Block MergeSortingBlockInputStream::readImpl()
{
if (!header)
{
header = getHeader();
removeConstantsFromSortDescription(header, description);
}
/** Algorithm:
* - read to memory blocks from source stream;
* - if too many of them and if external sorting is enabled,
@ -103,7 +112,7 @@ Block MergeSortingBlockInputStream::readImpl()
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, block.cloneEmpty());
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
@ -133,7 +142,7 @@ Block MergeSortingBlockInputStream::readImpl()
/// Create sorted streams to merge.
for (const auto & file : temporary_files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header_without_constants));
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
}

View File

@ -73,12 +73,7 @@ public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
children.push_back(input);
}
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
String getName() const override { return "MergeSorting"; }
@ -86,7 +81,7 @@ public:
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
@ -109,6 +104,7 @@ private:
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block header;
Block header_without_constants;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
@ -120,8 +116,8 @@ private:
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, 0)) {}
TemporaryFileStream(const std::string & path, const Block & header)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;

View File

@ -15,15 +15,17 @@ namespace ErrorCodes
MergingSortedBlockInputStream::MergingSortedBlockInputStream(
BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
const BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
header = children.at(0)->getHeader();
num_columns = header.columns();
}
void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns)
void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
{
/// Read the first blocks, initialize the queue.
if (first)
@ -44,9 +46,6 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
if (rows == 0)
continue;
if (!num_columns)
num_columns = shared_block_ptr->columns();
if (expected_block_size < rows)
expected_block_size = std::min(rows, max_block_size);
@ -62,32 +61,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
initQueue(queue);
}
/// Initialize the result.
/// We clone the structure of the first non-empty source block.
{
auto it = source_blocks.cbegin();
for (; it != source_blocks.cend(); ++it)
{
const SharedBlockPtr & shared_block_ptr = *it;
if (*shared_block_ptr)
{
header = shared_block_ptr->cloneEmpty();
break;
}
}
/// If all the input blocks are empty.
if (it == source_blocks.cend())
return;
}
/// Let's check that all source blocks have the same structure.
for (auto it = source_blocks.cbegin(); it != source_blocks.cend(); ++it)
for (const SharedBlockPtr & shared_block_ptr : source_blocks)
{
const SharedBlockPtr & shared_block_ptr = *it;
if (!*shared_block_ptr)
continue;
@ -120,10 +96,9 @@ Block MergingSortedBlockInputStream::readImpl()
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
init(merged_columns);
if (merged_columns.empty())
return {};

View File

@ -65,8 +65,8 @@ public:
* quiet - don't log profiling info
*/
MergingSortedBlockInputStream(
BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; }
@ -74,7 +74,7 @@ public:
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
Block getHeader() const override { return header; }
protected:
struct RowRef
@ -120,14 +120,16 @@ protected:
void readSuffixImpl() override;
/// Initializes the queue and the next result block.
void init(Block & header, MutableColumns & merged_columns);
/// Initializes the queue and the columns of next result block.
void init(MutableColumns & merged_columns);
/// Gets the next block from the source corresponding to the `current`.
template <typename TSortCursor>
void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue);
Block header;
const SortDescription description;
const size_t max_block_size;
size_t limit;

View File

@ -47,7 +47,27 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const
void RemoteBlockOutputStream::write(const Block & block)
{
assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream");
connection.sendData(block);
try
{
connection.sendData(block);
}
catch (const NetException & e)
{
/// Try to get more detailed exception from server
if (connection.poll(0))
{
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::Exception == packet.type)
{
packet.exception->rethrow();
return;
}
}
throw;
}
}

View File

@ -35,13 +35,8 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -49,13 +44,6 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return Block();
/// Additional initialization.
if (selected_row.empty())
{
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
}
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}

View File

@ -15,11 +15,13 @@ namespace DB
class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & version_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_),
version_column(version_column_)
ReplacingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_,
const String & version_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
{
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
}
String getName() const override { return "ReplacingSorted"; }
@ -29,7 +31,6 @@ protected:
Block readImpl() override;
private:
String version_column;
ssize_t version_column_number = -1;
Logger * log = &Logger::get("ReplacingSortedBlockInputStream");

View File

@ -14,6 +14,7 @@
#include <Functions/FunctionHelpers.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -23,6 +24,168 @@ namespace ErrorCodes
}
namespace
{
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
{
for (auto & desc : description)
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
return true;
return false;
}
}
SummingSortedBlockInputStream::SummingSortedBlockInputStream(
const BlockInputStreams & inputs_,
const SortDescription & description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{
current_row.resize(num_columns);
/// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
/** Fill in the column numbers, which must be summed.
* This can only be numeric columns that are not part of the sort key.
* If a non-empty column_names_to_sum is specified, then we only take these columns.
* Some columns from column_names_to_sum may not be found. This is ignored.
*/
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
{
const auto map_name = Nested::extractTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map"))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
discovered_maps[map_name].emplace_back(i);
}
else
{
if (!column.type->isSummable())
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Are they inside the PK?
if (isInPrimaryKey(description, column.name, i))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
if (column_names_to_sum.empty()
|| column_names_to_sum.end() !=
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
AggregateDescription desc;
desc.column_numbers = {i};
desc.init("sumWithOverflow", {column.type});
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Column is not going to be summed, use last value
column_numbers_not_to_aggregate.push_back(i);
}
}
}
/// select actual nested Maps from list of candidates
for (const auto & map : discovered_maps)
{
/// map should contain at least two elements (key -> value)
if (map.second.size() < 2)
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
break;
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
DataTypes argument_types;
AggregateDescription desc;
MapDescription map_desc;
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
{
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
const String & name = key_col.name;
const IDataType & nested_type = *static_cast<const DataTypeArray *>(key_col.type.get())->getNestedType();
if (column_num_it == map.second.begin()
|| endsWith(name, "ID")
|| endsWith(name, "Key")
|| endsWith(name, "Type"))
{
if (!nested_type.isValueRepresentedByInteger())
break;
map_desc.key_col_nums.push_back(*column_num_it);
}
else
{
if (!nested_type.isSummable())
break;
map_desc.val_col_nums.push_back(*column_num_it);
}
// Add column to function arguments
desc.column_numbers.push_back(*column_num_it);
argument_types.push_back(key_col.type);
}
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.init("sumMap", argument_types);
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Fall back to legacy mergeMaps for composite keys
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
maps_to_sum.emplace_back(std::move(map_desc));
}
}
}
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
{
for (auto & desc : columns_to_aggregate)
@ -77,31 +240,13 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
}
namespace
{
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
{
for (auto & desc : description)
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
return true;
return false;
}
}
Block SummingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -109,150 +254,7 @@ Block SummingSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return {};
/// Additional initialization.
if (current_row.empty())
{
current_row.resize(num_columns);
/// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
/** Fill in the column numbers, which must be summed.
* This can only be numeric columns that are not part of the sort key.
* If a non-empty column_names_to_sum is specified, then we only take these columns.
* Some columns from column_names_to_sum may not be found. This is ignored.
*/
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
{
const auto map_name = Nested::extractTableName(column.name);
/// if nested table name ends with `Map` it is a possible candidate for special handling
if (map_name == column.name || !endsWith(map_name, "Map"))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
discovered_maps[map_name].emplace_back(i);
}
else
{
if (!column.type->isSummable())
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Are they inside the PK?
if (isInPrimaryKey(description, column.name, i))
{
column_numbers_not_to_aggregate.push_back(i);
continue;
}
if (column_names_to_sum.empty()
|| column_names_to_sum.end() !=
std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name))
{
// Create aggregator to sum this column
AggregateDescription desc;
desc.column_numbers = {i};
desc.init("sumWithOverflow", {column.type});
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Column is not going to be summed, use last value
column_numbers_not_to_aggregate.push_back(i);
}
}
}
/// select actual nested Maps from list of candidates
for (const auto & map : discovered_maps)
{
/// map should contain at least two elements (key -> value)
if (map.second.size() < 2)
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
break;
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
DataTypes argument_types = {};
AggregateDescription desc;
MapDescription map_desc;
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
{
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
const String & name = key_col.name;
const IDataType & nested_type = *static_cast<const DataTypeArray *>(key_col.type.get())->getNestedType();
if (column_num_it == map.second.begin()
|| endsWith(name, "ID")
|| endsWith(name, "Key")
|| endsWith(name, "Type"))
{
if (!nested_type.isValueRepresentedByInteger())
break;
map_desc.key_col_nums.push_back(*column_num_it);
}
else
{
if (!nested_type.isSummable())
break;
map_desc.val_col_nums.push_back(*column_num_it);
}
// Add column to function arguments
desc.column_numbers.push_back(*column_num_it);
argument_types.push_back(key_col.type);
}
if (column_num_it != map.second.end())
{
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
continue;
}
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.init("sumMap", argument_types);
columns_to_aggregate.emplace_back(std::move(desc));
}
else
{
// Fall back to legacy mergeMaps for composite keys
for (auto col : map.second)
column_numbers_not_to_aggregate.push_back(col);
maps_to_sum.emplace_back(std::move(map_desc));
}
}
}
// Update aggregation result columns for current block
/// Update aggregation result columns for current block
for (auto & desc : columns_to_aggregate)
{
// Wrap aggregated columns in a tuple to match function signature
@ -332,18 +334,29 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
// Start aggregations with current row
addRow(current);
current_row_is_zero = true;
if (maps_to_sum.empty())
{
/// We have only columns_to_aggregate. The status of current row will be determined
/// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions.
current_row_is_zero = true;
}
else
{
/// We have complex maps that will be summed with 'mergeMap' method.
/// The single row is considered non zero, and the status after merging with other rows
/// will be determined in the branch below (when key_differs == false).
current_row_is_zero = false;
}
}
else
{
addRow(current);
// Merge maps only for same rows
for (auto & desc : maps_to_sum)
{
for (const auto & desc : maps_to_sum)
if (mergeMap(desc, current_row, current))
current_row_is_zero = false;
}
}
if (!current->isLast())

View File

@ -24,14 +24,12 @@ namespace ErrorCodes
class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
SummingSortedBlockInputStream(BlockInputStreams inputs_,
SummingSortedBlockInputStream(
const BlockInputStreams & inputs_,
const SortDescription & description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum_,
size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_), column_names_to_sum(column_names_to_sum_)
{
}
size_t max_block_size_);
String getName() const override { return "SummingSorted"; }
@ -46,7 +44,6 @@ private:
bool finished = false;
/// Columns with which values should be summed.
Names column_names_to_sum; /// If set, it is converted to column_numbers_to_aggregate when initialized.
ColumnNumbers column_numbers_not_to_aggregate;
/** A table can have nested tables that are treated in a special way.

View File

@ -2,6 +2,7 @@
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
@ -11,6 +12,20 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
sign_column_number = header.getPositionByName(sign_column_);
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{
if constexpr (sizeof(RowSourcePart) == 1)
@ -52,15 +67,8 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
bool is_initialized = !first;
init(header, merged_columns);
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED);
@ -68,11 +76,6 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return {};
/// Additional initialization.
if (!is_initialized)
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}

View File

@ -6,6 +6,7 @@
#include <deque>
namespace DB
{
@ -16,6 +17,7 @@ namespace ErrorCodes
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
/* Deque with fixed memory size. Allows pushing gaps.
* frontGap() returns the number of gaps were inserted before front.
*
@ -173,15 +175,9 @@ public:
/// Don't need version column. It's in primary key.
/// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
VersionedCollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
}
const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_ = nullptr);
String getName() const override { return "VersionedCollapsingSorted"; }
@ -190,8 +186,6 @@ protected:
Block readImpl() override;
private:
String sign_column;
size_t sign_column_number = 0;
Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream");

View File

@ -1,6 +1,6 @@
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Common/PocoSessionPoolHelpers.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Dictionaries/ODBCDictionarySource.h>
#include <Dictionaries/ODBCBlockInputStream.h>

View File

@ -78,7 +78,9 @@ list(REMOVE_ITEM clickhouse_functions_sources IFunction.cpp FunctionFactory.cpp
list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h FunctionHelpers.h)
add_library(clickhouse_functions ${clickhouse_functions_sources})
target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing)
target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES})
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})

View File

@ -10,6 +10,7 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnArray.h>
#include <Functions/FunctionHelpers.h>
#include <Common/UnicodeBar.h>
#include <Common/UTF8Helpers.h>
@ -30,7 +31,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/getStructureOfRemoteTable.h>
#include <Storages/getStructureOfRemoteTable.h>
namespace DB
@ -41,6 +42,8 @@ namespace ErrorCodes
extern const int FUNCTION_IS_SPECIAL;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int TOO_SLOW;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int FUNCTION_THROW_IF_VALUE_IS_NON_ZERO;
}
@ -726,6 +729,11 @@ public:
return std::make_shared<DataTypeUInt8>();
}
bool useDefaultImplementationForNulls() const override
{
return false;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
/// Second argument must be ColumnSet.

View File

@ -22,7 +22,7 @@
#include <Common/ClickHouseRevision.h>
#include <Common/MemoryTracker.h>
#include <Common/typeid_cast.h>
#include <Common/demangle.h>
#include <common/demangle.h>
#include <Interpreters/config_compile.h>

View File

@ -16,7 +16,6 @@
#include <Parsers/formatAST.h>
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/NestedUtils.h>
@ -68,6 +67,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int MULTIPLE_EXPRESSIONS_FOR_ALIAS;
extern const int UNKNOWN_IDENTIFIER;
extern const int CYCLIC_ALIASES;
@ -514,6 +514,7 @@ void ExpressionAnalyzer::analyzeAggregation()
{
getRootActions(select_query->array_join_expression_list(), true, false, temp_actions);
addMultipleArrayJoinAction(temp_actions);
array_join_columns = temp_actions->getSampleBlock().getNamesAndTypesList();
}
if (select_query)
@ -1521,7 +1522,8 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
else
{
NamesAndTypesList temp_columns = source_columns;
temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end());
temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end());
temp_columns.insert(temp_columns.end(), columns_added_by_join.begin(), columns_added_by_join.end());
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, settings);
getRootActions(func->arguments->children.at(0), true, false, temp_actions);
@ -1662,10 +1664,7 @@ void ExpressionAnalyzer::makeExplicitSet(const ASTFunction * node, const Block &
else
{
DataTypePtr left_type = sample_block.getByName(left_arg->getColumnName()).type;
if (const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(left_type.get()))
set_element_types.push_back(array_type->getNestedType());
else
set_element_types.push_back(left_type);
set_element_types.push_back(left_type);
}
/// The case `x in (1, 2)` distinguishes from the case `x in 1` (also `x in (1)`).

View File

@ -160,6 +160,8 @@ private:
/// Columns after ARRAY JOIN, JOIN, and/or aggregation.
NamesAndTypesList aggregated_columns;
NamesAndTypesList array_join_columns;
/// The main table in FROM clause, if exists.
StoragePtr storage;

View File

@ -66,7 +66,7 @@ ProcessList::EntryPtr ProcessList::insert(
/// Ask queries to cancel. They will check this flag.
for (auto it = range.first; it != range.second; ++it)
it->second->is_cancelled.store(true, std::memory_order_relaxed);
it->second->is_killed.store(true, std::memory_order_relaxed);
}
}
}

View File

@ -78,7 +78,7 @@ private:
CurrentMetrics::Increment num_queries {CurrentMetrics::Query};
std::atomic<bool> is_cancelled { false };
std::atomic<bool> is_killed { false };
/// Be careful using it. For example, queries field could be modified concurrently.
const ProcessListForUser * user_process_list = nullptr;
@ -140,13 +140,13 @@ public:
if (priority_handle)
priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
return !is_cancelled.load(std::memory_order_relaxed);
return !is_killed.load(std::memory_order_relaxed);
}
bool updateProgressOut(const Progress & value)
{
progress_out.incrementPiecewiseAtomically(value);
return !is_cancelled.load(std::memory_order_relaxed);
return !is_killed.load(std::memory_order_relaxed);
}
@ -157,7 +157,7 @@ public:
res.query = query;
res.client_info = client_info;
res.elapsed_seconds = watch.elapsedSeconds();
res.is_cancelled = is_cancelled.load(std::memory_order_relaxed);
res.is_cancelled = is_killed.load(std::memory_order_relaxed);
res.read_rows = progress_in.rows;
res.read_bytes = progress_in.bytes;
res.total_rows = progress_in.total_rows;

View File

@ -9,7 +9,6 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeNullable.h>
@ -282,78 +281,44 @@ ColumnPtr Set::execute(const Block & block, bool negative) const
return std::move(res);
}
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(block.safeGetByPosition(0).type.get());
if (array_type)
if (data_types.size() != num_key_columns)
{
/// Special treatment of Arrays in left hand side of IN:
/// check that at least one array element is in Set.
/// This is deprecated functionality and will be removed.
if (data_types.size() != 1 || num_key_columns != 1)
throw Exception("Number of columns in section IN doesn't match.", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
if (array_type->getNestedType()->isNullable())
throw Exception("Array(Nullable(...)) for left hand side of IN is not supported.", ErrorCodes::NOT_IMPLEMENTED);
if (!array_type->getNestedType()->equals(*data_types[0]))
throw Exception("Types in section IN don't match: " + data_types[0]->getName() +
" on the right, " + array_type->getNestedType()->getName() + " on the left.",
ErrorCodes::TYPE_MISMATCH);
const IColumn * in_column = block.safeGetByPosition(0).column.get();
/// The constant column to the left of IN is not supported directly. For this, it first materializes.
ColumnPtr materialized_column = in_column->convertToFullColumnIfConst();
if (materialized_column)
in_column = materialized_column.get();
if (const ColumnArray * col = typeid_cast<const ColumnArray *>(in_column))
executeArray(col, vec_res, negative);
else
throw Exception("Unexpected array column type: " + in_column->getName(), ErrorCodes::ILLEGAL_COLUMN);
std::stringstream message;
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
else
/// Remember the columns we will work with. Also check that the data types are correct.
ColumnRawPtrs key_columns;
key_columns.reserve(num_key_columns);
/// The constant columns to the left of IN are not supported directly. For this, they first materialize.
Columns materialized_columns;
for (size_t i = 0; i < num_key_columns; ++i)
{
if (data_types.size() != num_key_columns)
key_columns.push_back(block.safeGetByPosition(i).column.get());
if (!removeNullable(data_types[i])->equals(*removeNullable(block.safeGetByPosition(i).type)))
throw Exception("Types of column " + toString(i + 1) + " in section IN don't match: "
+ data_types[i]->getName() + " on the right, " + block.safeGetByPosition(i).type->getName() +
" on the left.", ErrorCodes::TYPE_MISMATCH);
if (ColumnPtr converted = key_columns.back()->convertToFullColumnIfConst())
{
std::stringstream message;
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
materialized_columns.emplace_back(converted);
key_columns.back() = materialized_columns.back().get();
}
/// Remember the columns we will work with. Also check that the data types are correct.
ColumnRawPtrs key_columns;
key_columns.reserve(num_key_columns);
/// The constant columns to the left of IN are not supported directly. For this, they first materialize.
Columns materialized_columns;
for (size_t i = 0; i < num_key_columns; ++i)
{
key_columns.push_back(block.safeGetByPosition(i).column.get());
if (!removeNullable(data_types[i])->equals(*removeNullable(block.safeGetByPosition(i).type)))
throw Exception("Types of column " + toString(i + 1) + " in section IN don't match: "
+ data_types[i]->getName() + " on the right, " + block.safeGetByPosition(i).type->getName() +
" on the left.", ErrorCodes::TYPE_MISMATCH);
if (ColumnPtr converted = key_columns.back()->convertToFullColumnIfConst())
{
materialized_columns.emplace_back(converted);
key_columns.back() = materialized_columns.back().get();
}
}
/// We will check existence in Set only for keys, where all components are not NULL.
ColumnPtr null_map_holder;
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
executeOrdinary(key_columns, vec_res, negative, null_map);
}
/// We will check existence in Set only for keys, where all components are not NULL.
ColumnPtr null_map_holder;
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
executeOrdinary(key_columns, vec_res, negative, null_map);
return std::move(res);
}
@ -403,38 +368,6 @@ void NO_INLINE Set::executeImplCase(
}
}
template <typename Method>
void NO_INLINE Set::executeArrayImpl(
Method & method,
const ColumnRawPtrs & key_columns,
const ColumnArray::Offsets & offsets,
ColumnUInt8::Container & vec_res,
bool negative,
size_t rows) const
{
typename Method::State state;
state.init(key_columns);
size_t keys_size = key_columns.size();
size_t prev_offset = 0;
/// For all rows
for (size_t i = 0; i < rows; ++i)
{
UInt8 res = 0;
/// For all elements
for (size_t j = prev_offset; j < offsets[i]; ++j)
{
/// Build the key
typename Method::Key key = state.getKey(key_columns, keys_size, j, key_sizes);
res |= negative ^ method.data.has(key);
if (res)
break;
}
vec_res[i] = res;
prev_offset = offsets[i];
}
}
void Set::executeOrdinary(
const ColumnRawPtrs & key_columns,
@ -457,25 +390,6 @@ void Set::executeOrdinary(
}
}
void Set::executeArray(const ColumnArray * key_column, ColumnUInt8::Container & vec_res, bool negative) const
{
size_t rows = key_column->size();
const ColumnArray::Offsets & offsets = key_column->getOffsets();
const IColumn & nested_column = key_column->getData();
switch (data.type)
{
case SetVariants::Type::EMPTY:
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
executeArrayImpl(*data.NAME, ColumnRawPtrs{&nested_column}, offsets, vec_res, negative, rows); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
}
MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vector<PKTuplePositionMapping> && index_mapping_)
: ordered_set(),
@ -507,6 +421,7 @@ MergeTreeSetIndex::MergeTreeSetIndex(const SetElements & set_elements, std::vect
std::sort(ordered_set.begin(), ordered_set.end());
}
/** Return the BoolMask where:
* 1: the intersection of the set and the range is non-empty
* 2: the range contains elements not in the set

View File

@ -2,7 +2,6 @@
#include <shared_mutex>
#include <Core/Block.h>
#include <Columns/ColumnArray.h>
#include <DataStreams/SizeLimits.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/SetVariants.h>
@ -89,9 +88,6 @@ private:
/// Limitations on the maximum size of the set
SizeLimits limits;
/// If there is an array on the left side of IN. We check that at least one element of the array presents in the set.
void executeArray(const ColumnArray * key_column, ColumnUInt8::Container & vec_res, bool negative) const;
/// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary(
const ColumnRawPtrs & key_columns,
@ -143,15 +139,6 @@ private:
bool negative,
size_t rows,
ConstNullMapPtr null_map) const;
template <typename Method>
void executeArrayImpl(
Method & method,
const ColumnRawPtrs & key_columns,
const ColumnArray::Offsets & offsets,
ColumnUInt8::Container & vec_res,
bool negative,
size_t rows) const;
};
using SetPtr = std::shared_ptr<Set>;

View File

@ -15,7 +15,7 @@ target_include_directories (hash_map BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (hash_map dbms)
add_executable (hash_map3 hash_map3.cpp)
target_link_libraries (hash_map3 dbms)
target_link_libraries (hash_map3 dbms ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES})
add_executable (hash_map_string hash_map_string.cpp)
target_include_directories (hash_map_string BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
@ -25,7 +25,7 @@ add_executable (hash_map_string_2 hash_map_string_2.cpp)
target_link_libraries (hash_map_string_2 dbms)
add_executable (hash_map_string_3 hash_map_string_3.cpp)
target_link_libraries (hash_map_string_3 dbms)
target_link_libraries (hash_map_string_3 dbms ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES})
target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash)
target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)

View File

@ -26,7 +26,10 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F
for (ASTs::const_iterator it = list_of_selects->children.begin(); it != list_of_selects->children.end(); ++it)
{
if (it != list_of_selects->children.begin())
settings.ostr << settings.nl_or_ws << indent_str << hilite_keyword << "UNION ALL" << hilite_none << settings.nl_or_ws;
settings.ostr
<< settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
<< "UNION ALL" << (settings.hilite ? hilite_keyword : "")
<< settings.nl_or_ws;
(*it)->formatImpl(settings, state, frame);
}

View File

@ -374,6 +374,69 @@ bool ParserCastExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
return true;
}
bool ParserExtractExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto begin = pos;
ParserIdentifier id_parser;
ASTPtr identifier;
if (!id_parser.parse(pos, identifier, expected))
return false;
if (pos->type != TokenType::OpeningRoundBracket)
return false;
++pos;
ASTPtr expr;
const char * function_name = nullptr;
if (ParserKeyword("SECOND").ignore(pos, expected))
function_name = "toSecond";
else if (ParserKeyword("MINUTE").ignore(pos, expected))
function_name = "toMinute";
else if (ParserKeyword("HOUR").ignore(pos, expected))
function_name = "toHour";
else if (ParserKeyword("DAY").ignore(pos, expected))
function_name = "toDayOfMonth";
// TODO: SELECT toRelativeWeekNum(toDate('2017-06-15')) - toRelativeWeekNum(toStartOfYear(toDate('2017-06-15')))
// else if (ParserKeyword("WEEK").ignore(pos, expected))
// function_name = "toRelativeWeekNum";
else if (ParserKeyword("MONTH").ignore(pos, expected))
function_name = "toMonth";
else if (ParserKeyword("YEAR").ignore(pos, expected))
function_name = "toYear";
else
return false;
ParserKeyword s_from("FROM");
if (!s_from.ignore(pos, expected))
return false;
ParserExpression elem_parser;
if (!elem_parser.parse(pos, expr, expected))
return false;
if (pos->type != TokenType::ClosingRoundBracket)
return false;
++pos;
auto function = std::make_shared<ASTFunction>();
auto exp_list = std::make_shared<ASTExpressionList>();
function->range.first = begin->begin;
function->range.second = pos->begin;
function->name = function_name; //"toYear";
function->arguments = exp_list;
function->children.push_back(exp_list);
exp_list->children.push_back(expr);
exp_list->range.first = begin->begin;
exp_list->range.second = pos->begin;
node = function;
return true;
}
bool ParserNull::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
@ -677,7 +740,8 @@ bool ParserExpressionElement::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
ParserArray array_p;
ParserArrayOfLiterals array_lite_p;
ParserLiteral lit_p;
ParserCastExpression fun_p;
ParserExtractExpression extract_p;
ParserCastExpression cast_p;
ParserCompoundIdentifier id_p;
ParserAsterisk asterisk_p;
ParserQualifiedAsterisk qualified_asterisk_p;
@ -697,7 +761,10 @@ bool ParserExpressionElement::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
if (lit_p.parse(pos, node, expected))
return true;
if (fun_p.parse(pos, node, expected))
if (extract_p.parse(pos, node, expected))
return true;
if (cast_p.parse(pos, node, expected))
return true;
if (qualified_asterisk_p.parse(pos, node, expected))

View File

@ -99,6 +99,15 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
class ParserExtractExpression : public IParserBase
{
static constexpr auto name = "EXTRACT";
protected:
const char * getName() const override { return name; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** NULL literal.
*/

View File

@ -0,0 +1,22 @@
#include "iostream_debug_helpers.h"
#include <Parsers/IParser.h>
#include <Parsers/Lexer.h>
#include <Parsers/TokenIterator.h>
namespace DB
{
std::ostream & operator<<(std::ostream & stream, const Token & what) {
stream << "Token (type="<< static_cast<int>(what.type) <<"){"<< std::string{what.begin, what.end} << "}";
return stream;
}
std::ostream & operator<<(std::ostream & stream, const Expected & what) {
stream << "Expected {variants=";
dumpValue(stream, what.variants)
<< "; max_parsed_pos=" << what.max_parsed_pos << "}";
return stream;
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <iostream>
namespace DB
{
class Token;
std::ostream & operator<<(std::ostream & stream, const Token & what);
struct Expected;
std::ostream & operator<<(std::ostream & stream, const Expected & what);
}
#include <Core/iostream_debug_helpers.h>

View File

@ -37,7 +37,7 @@ add_library (clickhouse-extract-from-config-lib ${SPLIT_SHARED} ExtractFromConfi
target_link_libraries (clickhouse-extract-from-config-lib clickhouse_common_config clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
add_library (clickhouse-client-lib Client.cpp)
target_link_libraries (clickhouse-client-lib clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_link_libraries (clickhouse-client-lib clickhouse_functions clickhouse_aggregate_functions ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-client-lib PRIVATE ${READLINE_INCLUDE_DIR})
install (FILES clickhouse-client.xml DESTINATION ${CLICKHOUSE_ETC_DIR}/clickhouse-client COMPONENT clickhouse-client RENAME config.xml)
@ -56,7 +56,7 @@ add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp)
target_link_libraries (clickhouse-format-lib dbms clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
add_library (clickhouse-copier-lib ClusterCopier.cpp)
target_link_libraries (clickhouse-copier-lib clickhouse-server-lib clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions)
target_link_libraries (clickhouse-copier-lib clickhouse-server-lib clickhouse_functions clickhouse_aggregate_functions)
if (USE_EMBEDDED_COMPILER)
link_directories (${LLVM_LIBRARY_DIRS})

View File

@ -192,7 +192,7 @@ private:
host = config.getString("host", "localhost");
port = config.getInt("port",
config.getInt(is_secure ? "tcp_secure_port" : "tcp_port",
config.getInt(is_secure ? "tcp_port_secure" : "tcp_port",
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", "");

View File

@ -134,6 +134,7 @@ void TCPHandler::runImpl()
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
std::unique_ptr<Exception> exception;
bool network_error = false;
try
{
@ -183,6 +184,10 @@ void TCPHandler::runImpl()
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
/// If a timeout occurred, try to inform client about it and close the session
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
network_error = true;
}
catch (const Poco::Net::NetException & e)
{
@ -211,8 +216,6 @@ void TCPHandler::runImpl()
exception = std::make_unique<Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
bool network_error = false;
try
{
if (exception)
@ -251,6 +254,14 @@ void TCPHandler::runImpl()
void TCPHandler::readData(const Settings & global_settings)
{
auto receive_timeout = query_context.getSettingsRef().receive_timeout.value;
/// Poll interval should not be greater than receive_timeout
size_t default_poll_interval = global_settings.poll_interval.value * 1000000;
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
while (1)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -258,7 +269,7 @@ void TCPHandler::readData(const Settings & global_settings)
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (1)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000))
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
break;
/// Do we need to shut down?
@ -269,8 +280,16 @@ void TCPHandler::readData(const Settings & global_settings)
* If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added.
*/
if (watch.elapsedSeconds() > global_settings.receive_timeout.totalSeconds())
throw Exception("Timeout exceeded while receiving data from client", ErrorCodes::SOCKET_TIMEOUT);
double elapsed = watch.elapsedSeconds();
if (elapsed > receive_timeout.totalSeconds())
{
std::stringstream ss;
ss << "Timeout exceeded while receiving data from client.";
ss << " Waited for " << static_cast<size_t>(elapsed) << " seconds,";
ss << " timeout is " << receive_timeout.totalSeconds() << " seconds.";
throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT);
}
}
/// If client disconnected.
@ -560,7 +579,7 @@ bool TCPHandler::receivePacket()
return false;
default:
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
}

View File

@ -4,5 +4,13 @@
<host>localhost</host>
<port>2181</port>
</node>
<node>
<host>yandex.ru</host>
<port>2181</port>
</node>
<node>
<host>111.0.1.2</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>

View File

@ -76,6 +76,11 @@
<!-- Don't exit if ipv6 or ipv4 unavailable, but listen_host with this protocol specified -->
<!-- <listen_try>0</listen_try> -->
<!-- Allow listen on same address:port -->
<!-- <listen_reuse_port>0</listen_reuse_port> -->
<!-- <listen_backlog>64</listen_backlog> -->
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>

View File

@ -1,8 +1,10 @@
#include <iostream>
#include <common/config_common.h>
#include <Common/config.h>
#include <config_tools.h>
#include <iostream>
#include <vector>
#include <string>
#include <utility> /// pair
#if USE_TCMALLOC
#include <gperftools/malloc_extension.h>

View File

@ -2172,7 +2172,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit()
return total_covered_parts;
}
bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
bool MergeTreeData::isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
{
String column_name = node->getColumnName();
@ -2180,9 +2180,12 @@ bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &
if (column_name == column.column_name)
return true;
if (partition_expr_ast && partition_expr_ast->children.at(0)->getColumnName() == column_name)
return true;
if (const ASTFunction * func = typeid_cast<const ASTFunction *>(node.get()))
if (func->arguments->children.size() == 1)
return isPrimaryKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front());
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front());
return false;
}
@ -2195,15 +2198,15 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con
if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple")
{
for (const auto & item : left_in_operand_tuple->arguments->children)
if (isPrimaryKeyColumnPossiblyWrappedInFunctions(item))
if (isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(item))
return true;
/// The tuple itself may be part of the primary key, so check that as a last resort.
return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand);
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand);
}
else
{
return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand);
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand);
}
}

View File

@ -652,7 +652,7 @@ private:
std::lock_guard<std::mutex> & data_parts_lock) const;
/// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &node) const;
bool isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
};
}

View File

@ -555,8 +555,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
settings.use_uncompressed_cache,
query_info.prewhere_info,
virt_column_names,
settings,
context);
settings);
}
else
{
@ -733,8 +732,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings,
const Context & context) const
const Settings & settings) const
{
const size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
@ -764,63 +762,43 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));
}
BlockInputStreams res;
if (to_merge.size() == 1)
BlockInputStreamPtr merged;
switch (data.merging_params.mode)
{
if (data.merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{
ExpressionActionsPtr sign_filter_expression;
String sign_filter_column;
case MergeTreeData::MergingParams::Ordinary:
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
createPositiveSignCondition(sign_filter_expression, sign_filter_column, context);
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
res.emplace_back(std::make_shared<FilterBlockInputStream>(to_merge[0], sign_filter_expression, sign_filter_column));
}
else
res = to_merge;
}
else if (to_merge.size() > 1)
{
BlockInputStreamPtr merged;
case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break;
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::Summing:
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Aggregating:
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
to_merge, data.getSortDescription(), data.merging_params.sign_column, max_block_size, true);
break;
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
res.emplace_back(merged);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
return res;
return {merged};
}

View File

@ -53,8 +53,7 @@ private:
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
const Names & virt_columns,
const Settings & settings,
const Context & context) const;
const Settings & settings) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
size_t getApproximateTotalRowsToRead(

View File

@ -104,6 +104,9 @@ struct MergeTreeSettings
\
/** Period to clean old queue logs, blocks hashes and parts */ \
M(SettingUInt64, cleanup_delay_period, 30) \
/** Add uniformly distributed value from 0 to x seconds to cleanup_delay_period \
to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables */ \
M(SettingUInt64, cleanup_delay_period_random_add, 10) \
\
/** Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. */ \
M(SettingUInt64, min_relative_delay_to_yield_leadership, 120) \

View File

@ -1006,6 +1006,10 @@ bool PKCondition::mayBeTrueInRangeImpl(const std::vector<Range> & key_ranges, co
rpn_stack.back() = !rpn_stack.back();
}
}
else
{
throw Exception("Set for IN is not created yet!", ErrorCodes::LOGICAL_ERROR);
}
}
else if (element.function == RPNElement::FUNCTION_NOT)
{

View File

@ -190,7 +190,7 @@ void ReplicatedMergeTreeAlterThread::run()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
force_recheck_parts = true;

View File

@ -3,6 +3,8 @@
#include <Common/setThreadName.h>
#include <Poco/Timestamp.h>
#include <random>
namespace DB
{
@ -25,7 +27,8 @@ void ReplicatedMergeTreeCleanupThread::run()
{
setThreadName("ReplMTCleanup");
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000;
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage.data.settings.cleanup_delay_period_random_add * 1000)(rng);
while (!storage.shutdown_called)
{
@ -35,7 +38,7 @@ void ReplicatedMergeTreeCleanupThread::run()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS);
@ -52,7 +55,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
/// This is loose condition: no problem if we actually had lost leadership at this moment
/// and two replicas will try to do cleanup simultaneously.
if (storage.is_leader_node)
if (storage.is_leader)
{
clearOldLogs();
clearOldBlocks();

View File

@ -7,6 +7,8 @@
#include <thread>
#include <map>
#include <pcg_random.hpp>
namespace DB
{
@ -27,6 +29,7 @@ private:
StorageReplicatedMergeTree & storage;
Logger * log;
std::thread thread;
pcg64 rng;
void run();
void iterate();

View File

@ -265,7 +265,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
{
/// TODO Better to check error code.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
@ -383,7 +383,7 @@ void ReplicatedMergeTreePartCheckThread::run()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
wakeup_event.tryWait(PART_CHECK_ERROR_SLEEP_MS);
}
}

View File

@ -17,7 +17,6 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric ReadonlyReplica;
extern const Metric LeaderReplica;
}
@ -93,7 +92,7 @@ void ReplicatedMergeTreeRestartingThread::run()
catch (const zkutil::KeeperException & e)
{
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (first_time)
storage.startup_event.set();
@ -139,7 +138,7 @@ void ReplicatedMergeTreeRestartingThread::run()
prev_time_of_check_delay = current_time;
/// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader_node
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
@ -147,18 +146,18 @@ void ReplicatedMergeTreeRestartingThread::run()
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
storage.is_leader_node = false;
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
storage.leader_election->yield();
storage.exitLeaderElection();
/// NOTE: enterLeaderElection() can throw if node creation in ZK fails.
/// This is bad because we can end up without a leader on any replica.
/// In this case we rely on the fact that the session will expire and we will reconnect.
storage.enterLeaderElection();
}
}
}
catch (...)
{
storage.startup_event.set();
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
wakeup_event.tryWait(check_period_ms);
@ -169,6 +168,8 @@ void ReplicatedMergeTreeRestartingThread::run()
storage.data_parts_exchange_endpoint_holder->cancelForever();
storage.data_parts_exchange_endpoint_holder = nullptr;
/// Cancel fetches and merges to force the queue_task to finish ASAP.
storage.fetcher.blocker.cancelForever();
storage.merger.merges_blocker.cancelForever();
partialShutdown();
@ -179,7 +180,7 @@ void ReplicatedMergeTreeRestartingThread::run()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
LOG_DEBUG(log, "Restarting thread finished");
@ -195,12 +196,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
updateQuorumIfWeHavePart();
if (storage.data.settings.replicated_can_become_leader)
storage.leader_election = std::make_shared<zkutil::LeaderElection>(
storage.zookeeper_path + "/leader_election",
*storage.current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
[this] { storage.becomeLeader(); CurrentMetrics::add(CurrentMetrics::LeaderReplica); },
storage.replica_name);
storage.enterLeaderElection();
/// Anything above can throw a KeeperException if something is wrong with ZK.
/// Anything below should not throw exceptions.
@ -222,7 +218,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
catch (...)
{
storage.replica_is_active_node = nullptr;
storage.leader_election = nullptr;
try
{
@ -366,17 +361,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
{
std::lock_guard<std::mutex> lock(storage.leader_node_mutex);
bool old_val = true;
if (storage.is_leader_node.compare_exchange_strong(old_val, false))
{
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
if (storage.merge_selecting_thread.joinable())
storage.merge_selecting_thread.join();
}
}
storage.exitLeaderElection();
if (storage.queue_updating_thread.joinable())
storage.queue_updating_thread.join();
@ -384,20 +371,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.alter_thread.reset();
storage.part_check_thread.stop();
/// Yielding leadership only after finish of merge_selecting_thread.
/// Otherwise race condition with parallel run of merge selecting thread on different servers is possible.
///
/// On the other hand, leader_election could call becomeLeader() from own thread after
/// merge_selecting_thread is finished and restarting_thread is destroyed.
/// becomeLeader() recreates merge_selecting_thread and it becomes joinable again, even restarting_thread is destroyed.
/// But restarting_thread is responsible to stop merge_selecting_thread.
/// It will lead to std::terminate in ~StorageReplicatedMergeTree().
/// Such behaviour was rarely observed on DROP queries.
/// Therefore we need either avoid becoming leader after first shutdown call (more deliberate choice),
/// either manually wait merge_selecting_thread.join() inside ~StorageReplicatedMergeTree(), either or something third.
/// So, we added shutdown check in becomeLeader() and made its creation and deletion atomic.
storage.leader_election = nullptr;
LOG_TRACE(log, "Threads finished");
}

View File

@ -1,4 +1,4 @@
#include <Common/PocoSessionPoolHelpers.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/StorageODBC.h>
#include <Storages/StorageFactory.h>

View File

@ -59,6 +59,12 @@ namespace ProfileEvents
extern const Event DataAfterMergeDiffersFromReplica;
}
namespace CurrentMetrics
{
extern const Metric LeaderReplica;
}
namespace DB
{
@ -205,40 +211,26 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should starts with '/', because chroot concatenates without it.
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
replica_path = zookeeper_path + "/replicas/" + replica_name;
bool skip_sanity_checks = false;
try
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_checks = true;
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
skip_sanity_checks = true;
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
<< replica_path << "/flags/force_restore_data).");
}
else if (has_force_restore_data_flag)
{
skip_sanity_checks = true;
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
}
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
<< replica_path << "/flags/force_restore_data).");
}
catch (const zkutil::KeeperException & e)
else if (has_force_restore_data_flag)
{
/// Failed to connect to ZK (this became known when trying to perform the first operation).
if (e.code == ZooKeeperImpl::ZooKeeper::ZCONNECTIONLOSS)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
current_zookeeper = nullptr;
}
else
throw;
skip_sanity_checks = true;
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
}
data.loadDataParts(skip_sanity_checks);
@ -1268,7 +1260,7 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/
merge_selecting_event.set();
@ -1468,7 +1460,7 @@ bool StorageReplicatedMergeTree::executeFetch(const StorageReplicatedMergeTree::
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
throw;
@ -1606,7 +1598,7 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
}
@ -1626,7 +1618,7 @@ bool StorageReplicatedMergeTree::queueTask()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
LogEntryPtr & entry = selected.first;
@ -1660,7 +1652,7 @@ bool StorageReplicatedMergeTree::queueTask()
LOG_INFO(log, e.displayText());
}
else
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
/** This exception will be written to the queue element, and it can be looked up using `system.replication_queue` table.
* The thread that performs this action will sleep a few seconds after the exception.
@ -1670,7 +1662,7 @@ bool StorageReplicatedMergeTree::queueTask()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
throw;
}
});
@ -1883,7 +1875,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
};
while (!shutdown_called && is_leader_node)
while (is_leader)
{
bool success = false;
@ -1929,10 +1921,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
if (shutdown_called || !is_leader_node)
if (!is_leader)
break;
if (!success)
@ -2037,23 +2029,55 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
}
void StorageReplicatedMergeTree::becomeLeader()
void StorageReplicatedMergeTree::enterLeaderElection()
{
std::lock_guard<std::mutex> lock(leader_node_mutex);
auto callback = [this]()
{
CurrentMetrics::add(CurrentMetrics::LeaderReplica);
LOG_INFO(log, "Became leader");
if (shutdown_called)
is_leader = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
};
try
{
leader_election = std::make_shared<zkutil::LeaderElection>(
zookeeper_path + "/leader_election",
*current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
callback,
replica_name);
}
catch (...)
{
leader_election = nullptr;
throw;
}
}
void StorageReplicatedMergeTree::exitLeaderElection()
{
if (!leader_election)
return;
if (merge_selecting_thread.joinable())
/// Shut down the leader election thread to avoid suddenly becoming the leader again after
/// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object.
leader_election->shutdown();
if (is_leader)
{
LOG_INFO(log, "Deleting old leader");
is_leader_node = false; /// exit trigger inside thread
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
LOG_INFO(log, "Stopped being leader");
is_leader = false;
merge_selecting_event.set();
merge_selecting_thread.join();
}
LOG_INFO(log, "Became leader");
is_leader_node = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
/// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
/// replica assigns merges at any given time.
leader_election = nullptr;
}
@ -2382,12 +2406,6 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown()
{
/** This must be done before waiting for restarting_thread.
* Because restarting_thread will wait for finishing of tasks in background pool,
* and parts are fetched in that tasks.
*/
fetcher.blocker.cancelForever();
if (restarting_thread)
{
restarting_thread->stop();
@ -2399,6 +2417,8 @@ void StorageReplicatedMergeTree::shutdown()
data_parts_exchange_endpoint_holder->cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
}
fetcher.blocker.cancelForever();
}
@ -2487,7 +2507,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
{
assertNotReadonly();
if (!is_leader_node)
if (!is_leader)
{
sendRequestToLeaderReplica(query, context.getSettingsRef());
return true;
@ -2813,7 +2833,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (!is_leader_node)
if (!is_leader)
{
sendRequestToLeaderReplica(query, context.getSettingsRef());
return;
@ -3171,7 +3191,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
auto zookeeper = tryGetZooKeeper();
res.is_leader = is_leader_node;
res.is_leader = is_leader;
res.is_readonly = is_readonly;
res.is_session_expired = !zookeeper || zookeeper->expired();
@ -3637,7 +3657,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retied)
NameSet * parts_should_be_retried)
{
zkutil::Requests ops;
auto it_first_node_in_batch = part_names.cbegin();
@ -3668,9 +3688,9 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
{
LOG_DEBUG(log, "There is no part " << *it_in_batch << " in ZooKeeper, it was only in filesystem");
}
else if (parts_should_be_retied && zkutil::isHardwareError(cur_code))
else if (parts_should_be_retried && zkutil::isHardwareError(cur_code))
{
parts_should_be_retied->emplace(*it_in_batch);
parts_should_be_retried->emplace(*it_in_batch);
}
else if (cur_code)
{
@ -3678,10 +3698,10 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
}
}
}
else if (parts_should_be_retied && zkutil::isHardwareError(code))
else if (parts_should_be_retried && zkutil::isHardwareError(code))
{
for (auto it_in_batch = it_first_node_in_batch; it_in_batch != it_next; ++it_in_batch)
parts_should_be_retied->emplace(*it_in_batch);
parts_should_be_retried->emplace(*it_in_batch);
}
else if (code)
{

View File

@ -220,8 +220,8 @@ private:
/** Is this replica "leading". The leader replica selects the parts to merge.
*/
std::atomic_bool is_leader_node {false};
std::mutex leader_node_mutex;
std::atomic<bool> is_leader {false};
zkutil::LeaderElectionPtr leader_election;
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
@ -239,7 +239,6 @@ private:
DataPartsExchange::Fetcher fetcher;
zkutil::LeaderElectionPtr leader_election;
/// When activated, replica is initialized and startup() method could exit
Poco::Event startup_event;
@ -334,7 +333,7 @@ private:
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retied = nullptr);
NameSet * parts_should_be_retried = nullptr);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);
@ -368,9 +367,15 @@ private:
*/
bool queueTask();
/// Select the parts to merge.
/// Postcondition:
/// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)
/// or an exception is thrown and leader_election is destroyed.
void enterLeaderElection();
void becomeLeader();
/// Postcondition:
/// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr.
/// leader_election node in ZK is either deleted, or the session is marked expired.
void exitLeaderElection();
/** Selects the parts to merge and writes to the log.
*/

View File

@ -1,4 +1,4 @@
#include <TableFunctions/getStructureOfRemoteTable.h>
#include "getStructureOfRemoteTable.h"
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterDescribeQuery.h>

View File

@ -5,7 +5,7 @@ list(REMOVE_ITEM clickhouse_table_functions_sources ITableFunction.cpp TableFunc
list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFunctionFactory.h)
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions dbms clickhouse_storages_system ${Poco_Foundation_LIBRARY})
target_link_libraries(clickhouse_table_functions clickhouse_storages_system dbms ${Poco_Foundation_LIBRARY})
if (Poco_SQLODBC_FOUND)
target_link_libraries (clickhouse_table_functions ${Poco_SQLODBC_LIBRARY})

View File

@ -1,4 +1,4 @@
#include <TableFunctions/getStructureOfRemoteTable.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Storages/StorageDistributed.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>

View File

@ -1,4 +1,4 @@
#include <TableFunctions/getStructureOfRemoteTable.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Storages/StorageDistributed.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>

View File

@ -1,3 +1,4 @@
<yandex>
<tcp_port_secure>59440</tcp_port_secure>
<insert_format_max_block_size>100000</insert_format_max_block_size>
</yandex>

View File

@ -3,8 +3,6 @@
1
1
1
1
1
0
0
1
@ -15,16 +13,6 @@
0
1
0
1
0
1
1
0
0
1
1
0
1
1 (2,3)
2 (2,3)
3 (2,3)

View File

@ -1,7 +1,5 @@
SELECT 1 IN (1, 2, 3);
SELECT toUInt16(1) IN (1, 1000, 3);
SELECT [1, 2, 3] IN (3, 4, 5);
SELECT materialize([1, 2, 3]) IN (3, 4, 5);
SELECT 'Hello' IN ('Hello', 'world');
SELECT (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) IN ((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17));
SELECT (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, '') IN ((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, ''));
@ -9,16 +7,6 @@ SELECT (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, '') IN (1, 2, 3,
SELECT (number AS n, n + 1, n + 2, n + 3) IN (1, 2, 3, 4) FROM system.numbers LIMIT 3;
SELECT (number AS n, n + 1, n + 2, n + 3, n - 1) IN (1, 2, 3, 4, 0) FROM system.numbers LIMIT 3;
SELECT (number AS n, n + 1, toString(n + 2), n + 3, n - 1) IN (1, 2, '3', 4, 0) FROM system.numbers LIMIT 3;
SELECT [1, 2, 3] IN (2);
SELECT [1, 2, 3] IN (4);
SELECT [1, 2, 3] NOT IN (1);
SELECT [1, 2, 3] NOT IN (1, 2);
SELECT [1, 2, 3] NOT IN (1, 2, 3);
SELECT [1, 2, 3] NOT IN (1, 2, 3, 4);
SELECT ['Hello', 'world'] IN ('world');
SELECT ['Hello', 'world'] NOT IN ('world');
SELECT ['Hello', 'world'] NOT IN ('Hello', 'world');
SELECT ['Hello', 'world'] NOT IN ('hello', 'world');
SELECT number, tuple FROM (SELECT 1 AS number, (2, 3) AS tuple) WHERE (number, tuple) IN (((1, (2, 3)), (4, (5, 6))));
SELECT number, tuple FROM (SELECT 2 AS number, (2, 3) AS tuple) WHERE (number, tuple) IN ((2, (2, 3)));
SELECT number, tuple FROM (SELECT 3 AS number, (2, 3) AS tuple) WHERE (number, tuple) IN (3, (2, 3));

View File

@ -8,12 +8,12 @@ $CLICKHOUSE_CLIENT -n --query="
CREATE VIEW test.numbers_100k AS SELECT * FROM system.numbers LIMIT 100000;
";
STEP_MULTIPLIER=1
STEP_MULTIPLIER=25
if [ -n "$DBMS_TESTS_UNDER_VALGRIND" ]; then
STEP_MULTIPLIER=50
STEP_MULTIPLIER=1000
fi
for i in $(seq 1000000 $((20000 * $STEP_MULTIPLIER)) 10000000 && seq 10100000 $((100000 * $STEP_MULTIPLIER)) 20000000); do
for i in $(seq 1000000 $((20000 * $STEP_MULTIPLIER)) 10000000 && seq 10100000 $((100000 * $STEP_MULTIPLIER)) 50000000); do
$CLICKHOUSE_CLIENT --max_memory_usage=$i --query="
SELECT intDiv(number, 5) AS k, max(toString(number)) FROM remote('127.0.0.{2,3}', test.numbers_100k) GROUP BY k ORDER BY k LIMIT 1;
" 2> /dev/null;

View File

@ -1,3 +1,2 @@
SELECT min(number) FROM system.numbers WHERE toUInt64(number % 1000) IN (SELECT DISTINCT blockSize() FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') SETTINGS max_rows_to_read = 1000000, read_overflow_mode = 'break';
SELECT * FROM (SELECT DISTINCT blockSize() AS x FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break');
SELECT x FROM (SELECT DISTINCT blockSize() AS x FROM remote('127.0.0.{2,3}', system.numbers) WHERE number IN (SELECT number * 2 FROM system.numbers SETTINGS max_rows_to_read = 10000, read_overflow_mode = 'break') SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') ORDER BY x;

View File

@ -1,12 +1,7 @@
DROP TABLE IF EXISTS test.summing_composite_key;
CREATE TABLE test.summing_composite_key (d Date, k UInt64, FirstMap Nested(k1 UInt32, k2ID Int8, s Float64), SecondMap Nested(k1ID UInt64, k2Key UInt32, k3Type Int32, s Int64)) ENGINE = SummingMergeTree(d, k, 1);
INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 1, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]);
INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 1, [2,1], [4,3], [20,22], [2,2,1], [5,5,0], [-3,-3,-33], [10,100,1000]);
INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]);
INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 2, [2,1,1], [4,3,3], [20,22,33], [2,2], [5,5], [-3,-3], [10,100]);
INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]);
INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 1, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]), ('2000-01-01', 1, [2,1], [4,3], [20,22], [2,2,1], [5,5,0], [-3,-3,-33], [10,100,1000]), ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]), ('2000-01-01', 2, [2,1,1], [4,3,3], [20,22,33], [2,2], [5,5], [-3,-3], [10,100]), ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]);
SELECT * FROM test.summing_composite_key ORDER BY d, k, _part_index;

View File

@ -5,12 +5,12 @@
0
0
1
\N
0
1
0
0
1
\N
0
1
0
0
@ -27,7 +27,7 @@
1
0
1
\N
0
0
1
0
@ -35,12 +35,12 @@
0
0
1
\N
0
1
0
0
1
\N
0
1
0
0
@ -57,7 +57,7 @@
1
0
1
\N
0
0
1
0

View File

@ -127,9 +127,9 @@ INSERT INTO test.two (x, y) VALUES ([4294967290], [4294967290]);
INSERT INTO test.one (x, y) VALUES ([4294967299], [4294967299]);
INSERT INTO test.two (x, y) VALUES ([4294967299], [4294967299]);
SELECT x, y FROM test.merge_one_two WHERE x IN (1);
SELECT x, y FROM test.merge_one_two WHERE x IN (4294967290);
SELECT x, y FROM test.merge_one_two WHERE x IN (4294967299);
SELECT x, y FROM test.merge_one_two WHERE arrayExists(_ -> _ IN (1), x);
SELECT x, y FROM test.merge_one_two WHERE arrayExists(_ -> _ IN (4294967290), x);
SELECT x, y FROM test.merge_one_two WHERE arrayExists(_ -> _ IN (4294967299), x);
DROP TABLE IF EXISTS test.one;
DROP TABLE IF EXISTS test.two;

View File

@ -1 +1,3 @@
Still alive
Still alive
Still alive

View File

@ -1 +1,3 @@
Still alive
Still alive
Still alive

View File

@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT sum(ignore(*)) FROM (SELECT number % 1000 AS k, groupArray(number) FROM numbers(100000000) GROUP BY k)' &
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT sum(ignore(*)) FROM (SELECT number % 1000 AS k, groupArray(number) FROM numbers(100000000) GROUP BY k)' 2>&1 > /dev/null &
sleep 0.1 # First query (usually) should be received by the server after this sleep.
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 1 WHERE 0'
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 0'
wait

View File

@ -11,3 +11,27 @@ key, arrayJoin(arr) in ((1, 1), (2, 2))
(key, left array join arr) in ((1, 1), (2, 2))
1
2
all
1 [1]
2 [2]
key, arrayJoin(n.x) in (1, 1)
1 1
key, arrayJoin(n.x) in ((1, 1), (2, 2))
1 1
2 2
(key, left array join n.x) in (1, 1)
1
(key, left array join n.x) in ((1, 1), (2, 2))
1
2
max(key) from tab where (key, left array join n.x) in (1, 1)
1
1
max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))
2
2
max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)
1
max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))
2
1

View File

@ -14,3 +14,32 @@ select key from test.tab left array join arr as val where (key, val) in (1, 1);
select '(key, left array join arr) in ((1, 1), (2, 2))';
select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key;
drop table if exists test.tab;
create table test.tab (key UInt64, n Nested(x UInt64)) Engine = MergeTree order by key;
insert into test.tab values (1, [1]);
insert into test.tab values (2, [2]);
select 'all';
select * from test.tab order by key;
select 'key, arrayJoin(n.x) in (1, 1)';
select key, arrayJoin(n.x) as val from test.tab where (key, val) in (1, 1);
select 'key, arrayJoin(n.x) in ((1, 1), (2, 2))';
select key, arrayJoin(n.x) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key;
select '(key, left array join n.x) in (1, 1)';
select key from test.tab left array join n.x as val where (key, val) in (1, 1);
select '(key, left array join n.x) in ((1, 1), (2, 2))';
select key from test.tab left array join n.x as val where (key, val) in ((1, 1), (2, 2)) order by key;
select 'max(key) from tab where (key, left array join n.x) in (1, 1)';
select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1));
select max(key) from test.tab left array join n as val where (key, val.x) in (1, 1);
select 'max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))';
select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1), (2, 2));
select max(key) from test.tab left array join n as val where (key, val.x) in ((1, 1), (2, 2));
select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)';
select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in (1, 1);
select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))';
select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in ((1, 1), (2, 2));
drop table if exists test.tab;
CREATE TABLE test.tab (key1 Int32, id1 Int64, c1 Int64) ENGINE = MergeTree PARTITION BY id1 ORDER BY (key1) ;
insert into test.tab values ( -1, 1, 0 );
SELECT count(*) FROM test.tab PREWHERE id1 IN (1);

View File

@ -0,0 +1 @@
2017-09-10 ['a','b']

View File

@ -0,0 +1,6 @@
USE test;
DROP TABLE IF EXISTS test;
CREATE TABLE test (date Date, keys Array(Nullable(String))) ENGINE = MergeTree(date, date, 1);
INSERT INTO test VALUES ('2017-09-10', ['a', 'b']);
SELECT * FROM test LIMIT 1;
DROP TABLE test;

Some files were not shown because too many files have changed in this diff Show More