Merge remote-tracking branch 'upstream/master' into fix3

This commit is contained in:
proller 2018-04-04 13:12:10 +03:00
commit 786f9ca5bb
49 changed files with 368 additions and 296 deletions

View File

@ -9,7 +9,7 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/poco/CMakeLists.txt")
endif ()
if (NOT USE_INTERNAL_POCO_LIBRARY)
find_package (Poco COMPONENTS Net NetSSL XML Data Crypto DataODBC MongoDB)
find_package (Poco COMPONENTS Net NetSSL XML SQL Data Crypto DataODBC MongoDB)
endif ()
if (Poco_INCLUDE_DIRS AND Poco_Foundation_LIBRARY)
@ -24,6 +24,15 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (ENABLE_DATA_SQLITE 0 CACHE BOOL "")
set (ENABLE_DATA_MYSQL 0 CACHE BOOL "")
set (ENABLE_DATA_POSTGRESQL 0 CACHE BOOL "")
# new after 2.0.0:
set (POCO_ENABLE_ZIP 0 CACHE BOOL "")
set (POCO_ENABLE_PAGECOMPILER 0 CACHE BOOL "")
set (POCO_ENABLE_PAGECOMPILER_FILE2PAGE 0 CACHE BOOL "")
set (POCO_ENABLE_REDIS 0 CACHE BOOL "")
set (POCO_ENABLE_SQL_SQLITE 0 CACHE BOOL "")
set (POCO_ENABLE_SQL_MYSQL 0 CACHE BOOL "")
set (POCO_ENABLE_SQL_POSTGRESQL 0 CACHE BOOL "")
set (POCO_UNBUNDLED 1 CACHE BOOL "")
set (POCO_UNBUNDLED_PCRE 0 CACHE BOOL "")
set (POCO_UNBUNDLED_EXPAT 0 CACHE BOOL "")
@ -44,9 +53,25 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
endif ()
if (ODBC_FOUND)
set (Poco_DataODBC_FOUND 1)
set (Poco_DataODBC_LIBRARY PocoDataODBC ${ODBC_LIBRARIES} ${LTDL_LIBRARY})
set (Poco_DataODBC_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/")
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/ODBC/include/")
set (Poco_SQL_FOUND 1)
set (Poco_SQLODBC_FOUND 1)
set (Poco_SQL_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/include"
"${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include"
)
set (Poco_SQLODBC_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/poco/SQL/ODBC/include/"
"${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/"
)
set (Poco_SQL_LIBRARY PocoSQL)
set (Poco_SQLODBC_LIBRARY PocoSQLODBC ${ODBC_LIBRARIES} ${LTDL_LIBRARY})
else ()
set (Poco_DataODBC_FOUND 1)
set (Poco_DataODBC_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/ODBC/include/" "${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include")
set (Poco_Data_LIBRARY PocoData)
set (Poco_DataODBC_LIBRARY PocoDataODBC ${ODBC_LIBRARIES} ${LTDL_LIBRARY})
endif ()
endif ()
# TODO! fix internal ssl
@ -66,7 +91,6 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (Poco_Foundation_LIBRARY PocoFoundation)
set (Poco_Util_LIBRARY PocoUtil)
set (Poco_Net_LIBRARY PocoNet)
set (Poco_Data_LIBRARY PocoData)
set (Poco_XML_LIBRARY PocoXML)
endif ()

View File

@ -161,11 +161,20 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
target_include_directories (clickhouse_common_io BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
endif ()
if (Poco_DataODBC_FOUND)
target_link_libraries (dbms ${Poco_DataODBC_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES})
if (Poco_SQLODBC_FOUND)
target_link_libraries (clickhouse_common_io ${Poco_SQL_LIBRARY})
target_include_directories (clickhouse_common_io PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQL_INCLUDE_DIRS})
target_link_libraries (dbms ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS})
endif()
if (Poco_DataODBC_FOUND)
target_link_libraries (clickhouse_common_io ${Poco_Data_LIBRARY})
target_link_libraries (dbms ${Poco_DataODBC_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_DataODBC_INCLUDE_DIRS})
endif()
if (Poco_MongoDB_FOUND)
target_link_libraries (dbms ${Poco_MongoDB_LIBRARY})
endif()
@ -212,6 +221,7 @@ endif ()
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
target_include_directories (clickhouse_common_io PUBLIC ${Poco_DataODBC_INCLUDE_DIRS})
target_include_directories (clickhouse_common_io BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR})
# also for copy_headers.sh:

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54372-testing)
set(VERSION_REVISION 54372)
set(VERSION_DESCRIBE v1.1.54373-testing)
set(VERSION_REVISION 54373)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -11,6 +11,7 @@
#include <Poco/DOM/Comment.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/StringUtils/StringUtils.h>
#define PREPROCESSED_SUFFIX "-preprocessed"

View File

@ -28,7 +28,9 @@
M(MemoryTrackingForMerges) \
M(LeaderElection) \
M(EphemeralNode) \
M(ZooKeeperSession) \
M(ZooKeeperWatch) \
M(ZooKeeperRequest) \
M(DelayedInserts) \
M(ContextLockWait) \
M(StorageBufferRows) \

View File

@ -56,14 +56,20 @@
\
M(ZooKeeperInit) \
M(ZooKeeperTransactions) \
M(ZooKeeperGetChildren) \
M(ZooKeeperList) \
M(ZooKeeperCreate) \
M(ZooKeeperRemove) \
M(ZooKeeperExists) \
M(ZooKeeperGet) \
M(ZooKeeperSet) \
M(ZooKeeperMulti) \
M(ZooKeeperCheck) \
M(ZooKeeperClose) \
M(ZooKeeperWatchResponse) \
M(ZooKeeperExceptions) \
M(ZooKeeperWaitMicroseconds) \
M(ZooKeeperBytesSent) \
M(ZooKeeperBytesReceived) \
\
M(DistributedConnectionFailTry) \
M(DistributedConnectionMissingTable) \

View File

@ -1,21 +1,6 @@
#pragma once
#include <Common/Exception.h>
#include "Types.h"
#include <Common/ProfileEvents.h>
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
}
namespace ProfileEvents
{
extern const Event ZooKeeperExceptions;
}
namespace zkutil
@ -43,42 +28,7 @@ inline bool isUserError(int32_t zk_return_code)
}
class KeeperException : public DB::Exception
{
private:
/// delegate constructor, used to minimize repetition; last parameter used for overload resolution
KeeperException(const std::string & msg, const int32_t code, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) { incrementEventCounter(); }
public:
KeeperException(const std::string & msg, const int32_t code)
: KeeperException(msg + " (" + ZooKeeperImpl::ZooKeeper::errorMessage(code) + ")", code, 0) {}
explicit KeeperException(const int32_t code) : KeeperException(ZooKeeperImpl::ZooKeeper::errorMessage(code), code, 0) {}
KeeperException(const int32_t code, const std::string & path)
: KeeperException(std::string{ZooKeeperImpl::ZooKeeper::errorMessage(code)} + ", path: " + path, code, 0) {}
KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); }
const char * name() const throw() override { return "zkutil::KeeperException"; }
const char * className() const throw() override { return "zkutil::KeeperException"; }
KeeperException * clone() const override { return new KeeperException(*this); }
/// Any error related with network or master election
/// In case of these errors you should reinitialize ZooKeeper session.
bool isHardwareError() const
{
return zkutil::isHardwareError(code);
}
const int32_t code;
private:
static void incrementEventCounter()
{
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
}
};
using KeeperException = ZooKeeperImpl::Exception;
class KeeperMultiException : public KeeperException

View File

@ -1,5 +1,7 @@
#include "KeeperException.h"
#include "Lock.h"
using namespace zkutil;
bool Lock::tryLock()

View File

@ -40,7 +40,7 @@ namespace zkutil
{
unlock();
}
catch (const zkutil::KeeperException & e)
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}

View File

@ -1,4 +1,5 @@
#include "ZooKeeper.h"
#include "KeeperException.h"
#include <random>
#include <pcg_random.hpp>
@ -6,7 +7,6 @@
#include <boost/algorithm/string.hpp>
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/PODArray.h>
#include <Common/randomSeed.h>
@ -15,25 +15,6 @@
#define ZOOKEEPER_OPERATION_TIMEOUT_MS 1000
namespace ProfileEvents
{
extern const Event ZooKeeperInit;
extern const Event ZooKeeperTransactions;
extern const Event ZooKeeperCreate;
extern const Event ZooKeeperRemove;
extern const Event ZooKeeperExists;
extern const Event ZooKeeperMulti;
extern const Event ZooKeeperGet;
extern const Event ZooKeeperSet;
extern const Event ZooKeeperGetChildren;
}
namespace CurrentMetrics
{
extern const Metric ZooKeeperWatch;
}
namespace DB
{
namespace ErrorCodes
@ -84,8 +65,6 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
Poco::Timespan(0, ZOOKEEPER_OPERATION_TIMEOUT_MS * 1000));
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
if (!chroot.empty() && !exists("/"))
@ -195,9 +174,6 @@ int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
impl->list(path, callback, watch_callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperGetChildren);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -235,9 +211,6 @@ int32_t ZooKeeper::createImpl(const std::string & path, const std::string & data
impl->create(path, data, mode & 1, mode & 2, {}, callback); /// TODO better mode
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperCreate);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -305,9 +278,6 @@ int32_t ZooKeeper::removeImpl(const std::string & path, int32_t version)
impl->remove(path, version, callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -342,9 +312,6 @@ int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat, WatchCallba
impl->exists(path, callback, watch_callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -383,9 +350,6 @@ int32_t ZooKeeper::getImpl(const std::string & path, std::string & res, Stat * s
impl->get(path, callback, watch_callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -434,9 +398,6 @@ int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data,
impl->set(path, data, version, callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperSet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -486,9 +447,6 @@ int32_t ZooKeeper::multiImpl(const Requests & requests, Responses & responses)
impl->multi(requests, callback);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return code;
}
@ -595,9 +553,6 @@ void ZooKeeper::waitForDisappear(const std::string & path)
impl->exists(path, callback, watch);
event.wait();
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
return;
@ -646,9 +601,6 @@ std::future<ZooKeeperImpl::ZooKeeper::GetResponse> ZooKeeper::asyncGet(const std
};
impl->get(path, std::move(callback), {});
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -667,9 +619,6 @@ std::future<ZooKeeperImpl::ZooKeeper::GetResponse> ZooKeeper::asyncTryGet(const
};
impl->get(path, std::move(callback), {});
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -687,9 +636,6 @@ std::future<ZooKeeperImpl::ZooKeeper::ExistsResponse> ZooKeeper::asyncExists(con
};
impl->exists(path, std::move(callback), {});
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -708,9 +654,6 @@ std::future<ZooKeeperImpl::ZooKeeper::ListResponse> ZooKeeper::asyncGetChildren(
};
impl->list(path, std::move(callback), {});
ProfileEvents::increment(ProfileEvents::ZooKeeperGetChildren);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -728,9 +671,6 @@ std::future<ZooKeeperImpl::ZooKeeper::RemoveResponse> ZooKeeper::asyncRemove(con
};
impl->remove(path, version, std::move(callback));
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -748,9 +688,6 @@ std::future<ZooKeeperImpl::ZooKeeper::RemoveResponse> ZooKeeper::asyncTryRemove(
};
impl->remove(path, version, std::move(callback));
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -765,9 +702,6 @@ std::future<ZooKeeperImpl::ZooKeeper::MultiResponse> ZooKeeper::tryAsyncMulti(co
};
impl->multi(ops, std::move(callback));
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}
@ -785,9 +719,6 @@ std::future<ZooKeeperImpl::ZooKeeper::MultiResponse> ZooKeeper::asyncMulti(const
};
impl->multi(ops, std::move(callback));
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
return future;
}

View File

@ -1,7 +1,6 @@
#pragma once
#include "Types.h"
#include "KeeperException.h"
#include <Poco/Util/LayeredConfiguration.h>
#include <unordered_set>
#include <future>

View File

@ -1,9 +1,11 @@
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Poco/Exception.h>
@ -11,7 +13,40 @@
#include <array>
//#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
}
namespace ProfileEvents
{
extern const Event ZooKeeperExceptions;
extern const Event ZooKeeperInit;
extern const Event ZooKeeperTransactions;
extern const Event ZooKeeperCreate;
extern const Event ZooKeeperRemove;
extern const Event ZooKeeperExists;
extern const Event ZooKeeperMulti;
extern const Event ZooKeeperGet;
extern const Event ZooKeeperSet;
extern const Event ZooKeeperList;
extern const Event ZooKeeperCheck;
extern const Event ZooKeeperClose;
extern const Event ZooKeeperWaitMicroseconds;
extern const Event ZooKeeperBytesSent;
extern const Event ZooKeeperBytesReceived;
extern const Event ZooKeeperWatchResponse;
}
namespace CurrentMetrics
{
extern const Metric ZooKeeperRequest;
extern const Metric ZooKeeperWatch;
}
/** ZooKeeper wire protocol.
@ -228,6 +263,33 @@ after:
namespace ZooKeeperImpl
{
Exception::Exception(const std::string & msg, const int32_t code, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code)
{
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
}
Exception::Exception(const std::string & msg, const int32_t code)
: Exception(msg + " (" + ZooKeeperImpl::ZooKeeper::errorMessage(code) + ")", code, 0)
{
}
Exception::Exception(const int32_t code)
: Exception(ZooKeeperImpl::ZooKeeper::errorMessage(code), code, 0)
{
}
Exception::Exception(const int32_t code, const std::string & path)
: Exception(std::string{ZooKeeperImpl::ZooKeeper::errorMessage(code)} + ", path: " + path, code, 0)
{
}
Exception::Exception(const Exception & exc)
: DB::Exception(exc), code(exc.code)
{
}
using namespace DB;
@ -304,10 +366,10 @@ void read(String & s, ReadBuffer & in)
static constexpr int32_t max_string_size = 1 << 20;
int32_t size = 0;
read(size, in);
if (size < 0)
throw Exception("Negative size"); /// TODO Actually it means that zookeeper node have NULL value. Maybe better to treat it like empty string.
if (size < 0) /// TODO Actually it means that zookeeper node has NULL value. Maybe better to treat it like empty string.
throw Exception("Negative size while reading string from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
if (size > max_string_size)
throw Exception("Too large string size"); /// TODO error code
throw Exception("Too large string size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
s.resize(size);
in.read(&s[0], size);
}
@ -317,7 +379,7 @@ template <size_t N> void read(std::array<char, N> & s, ReadBuffer & in)
int32_t size = 0;
read(size, in);
if (size != N)
throw Exception("Unexpected array size"); /// TODO error code
throw Exception("Unexpected array size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
in.read(&s[0], N);
}
@ -347,9 +409,9 @@ template <typename T> void read(std::vector<T> & arr, ReadBuffer & in)
int32_t size = 0;
read(size, in);
if (size < 0)
throw Exception("Negative size");
throw Exception("Negative size while reading array from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
if (size > max_array_size)
throw Exception("Too large array size"); /// TODO error code
throw Exception("Too large array size while reading from ZooKeeper", ZooKeeper::ZMARSHALLINGERROR);
arr.resize(size);
for (auto & elem : arr)
read(elem, in);
@ -478,6 +540,8 @@ ZooKeeper::ZooKeeper(
send_thread = std::thread([this] { sendThread(); });
receive_thread = std::thread([this] { receiveThread(); });
ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
}
@ -488,6 +552,7 @@ void ZooKeeper::connect(
static constexpr size_t num_tries = 3;
bool connected = false;
WriteBufferFromOwnString fail_reasons;
for (size_t try_no = 0; try_no < num_tries; ++try_no)
{
for (const auto & address : addresses)
@ -500,10 +565,11 @@ void ZooKeeper::connect(
}
catch (const Poco::Net::NetException & e)
{
/// TODO log exception
fail_reasons << "\n" << getCurrentExceptionMessage(false);
}
catch (const Poco::TimeoutException & e)
{
fail_reasons << "\n" << getCurrentExceptionMessage(false);
}
}
@ -512,7 +578,22 @@ void ZooKeeper::connect(
}
if (!connected)
throw Exception("All connection tries failed"); /// TODO more info; error code
{
WriteBufferFromOwnString out;
out << "All connection tries failed while connecting to ZooKeeper. Addresses: ";
bool first = true;
for (const auto & address : addresses)
{
if (first)
first = false;
else
out << ", ";
out << address.toString();
}
out << fail_reasons.str();
throw Exception(out.str(), ZCONNECTIONLOSS);
}
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
@ -553,15 +634,15 @@ void ZooKeeper::receiveHandshake()
read(handshake_length);
if (handshake_length != 36)
throw Exception("Unexpected handshake length received: " + toString(handshake_length));
throw Exception("Unexpected handshake length received: " + toString(handshake_length), ZMARSHALLINGERROR);
read(protocol_version_read);
if (protocol_version_read != protocol_version)
throw Exception("Unexpected protocol version: " + toString(protocol_version_read));
throw Exception("Unexpected protocol version: " + toString(protocol_version_read), ZMARSHALLINGERROR);
read(timeout);
if (timeout != session_timeout.totalMilliseconds())
throw Exception("Received different session timeout from server: " + toString(timeout));
throw Exception("Received different session timeout from server: " + toString(timeout), ZMARSHALLINGERROR);
read(session_id);
read(passwd);
@ -588,14 +669,17 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
read(err);
if (xid != auth_xid)
throw Exception("Unexpected event recievent in reply to auth request: " + toString(xid));
throw Exception("Unexpected event recieved in reply to auth request: " + toString(xid),
ZMARSHALLINGERROR);
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length));
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length),
ZMARSHALLINGERROR);
if (err)
throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)));
throw Exception("Error received in reply to auth request. Code: " + toString(err) + ". Message: " + String(errorMessage(err)),
ZMARSHALLINGERROR);
}
@ -607,6 +691,8 @@ void ZooKeeper::sendThread()
{
while (!expired)
{
auto prev_bytes_sent = out->count();
auto now = clock::now();
auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(session_timeout.totalMilliseconds() / 3);
@ -635,6 +721,8 @@ void ZooKeeper::sendThread()
request.xid = ping_xid;
request.write(*out);
}
ProfileEvents::increment(ProfileEvents::ZooKeeperBytesSent, out->count() - prev_bytes_sent);
}
}
catch (...)
@ -657,19 +745,21 @@ void ZooKeeper::receiveThread()
Int64 waited = 0;
while (!expired)
{
auto prev_bytes_received = in->count();
clock::time_point now = clock::now();
UInt64 max_wait = operation_timeout.totalMicroseconds();
bool has_operations = false;
std::optional<RequestInfo> earliest_operation;
{
std::lock_guard lock(operations_mutex);
if (!operations.empty())
{
/// Operations are ordered by xid (and consequently, by time).
has_operations = true;
auto earliest_operation_deadline = operations.begin()->second.time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
earliest_operation = operations.begin()->second;
auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
if (now > earliest_operation_deadline)
throw Exception("Operation timeout");
throw Exception("Operation timeout (deadline already expired) for path: " + earliest_operation->request->getPath(), ZOPERATIONTIMEOUT);
max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
}
}
@ -684,13 +774,15 @@ void ZooKeeper::receiveThread()
}
else
{
if (has_operations)
throw Exception("Operation timeout");
if (earliest_operation)
throw Exception("Operation timeout (no response) for path: " + earliest_operation->request->getPath(), ZOPERATIONTIMEOUT);
waited += max_wait;
if (waited > session_timeout.totalMicroseconds())
throw Exception("Nothing is received in session timeout");
throw Exception("Nothing is received in session timeout", ZOPERATIONTIMEOUT);
}
ProfileEvents::increment(ProfileEvents::ZooKeeperBytesReceived, in->count() - prev_bytes_received);
}
}
catch (...)
@ -729,10 +821,10 @@ ZooKeeper::ResponsePtr ZooKeeper::CloseRequest::makeResponse() const { return st
void addRootPath(String & path, const String & root_path)
{
if (path.empty())
throw Exception("Path cannot be empty");
throw Exception("Path cannot be empty", ZooKeeper::ZBADARGUMENTS);
if (path[0] != '/')
throw Exception("Path must begin with /");
throw Exception("Path must begin with /", ZooKeeper::ZBADARGUMENTS);
if (root_path.empty())
return;
@ -749,7 +841,7 @@ void removeRootPath(String & path, const String & root_path)
return;
if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path");
throw Exception("Received path is not longer than root_path", ZooKeeper::ZDATAINCONSISTENCY);
path = path.substr(root_path.size());
}
@ -798,14 +890,13 @@ void ZooKeeper::receiveEvent()
if (xid == ping_xid)
{
if (err)
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)));
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), ZRUNTIMEINCONSISTENCY);
response = std::make_shared<HeartbeatResponse>();
// std::cerr << "Received heartbeat\n";
}
else if (xid == watch_xid)
{
ProfileEvents::increment(ProfileEvents::ZooKeeperWatchResponse);
response = std::make_shared<WatchResponse>();
request_info.callback = [this](const Response & response)
@ -832,11 +923,10 @@ void ZooKeeper::receiveEvent()
if (callback)
callback(watch_response); /// NOTE We may process callbacks not under mutex.
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, it->second.size());
watches.erase(it);
}
};
// std::cerr << "Received watch\n";
}
else
{
@ -845,15 +935,17 @@ void ZooKeeper::receiveEvent()
auto it = operations.find(xid);
if (it == operations.end())
throw Exception("Received response for unknown xid");
throw Exception("Received response for unknown xid", ZRUNTIMEINCONSISTENCY);
request_info = std::move(it->second);
operations.erase(it);
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest);
}
// std::cerr << "Received response: " << request_info.request->getOpNum() << "\n";
response = request_info.request->makeResponse();
auto elapsed_microseconds = std::chrono::duration_cast<std::chrono::microseconds>(clock::now() - request_info.time).count();
ProfileEvents::increment(ProfileEvents::ZooKeeperWaitMicroseconds, elapsed_microseconds);
}
if (err)
@ -866,7 +958,7 @@ void ZooKeeper::receiveEvent()
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length));
throw Exception("Response length doesn't match. Expected: " + toString(length) + ", actual: " + toString(actual_length), ZMARSHALLINGERROR);
if (request_info.callback)
request_info.callback(*response);
@ -905,6 +997,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
request_info.callback(*response);
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
@ -923,6 +1016,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
callback(response);
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
}
@ -1058,12 +1152,13 @@ void ZooKeeper::ErrorResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(read_error, in);
if (read_error != error)
throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")");
throw Exception("Error code in ErrorResponse (" + toString(read_error) + ") doesn't match error code in header (" + toString(error) + ")",
ZMARSHALLINGERROR);
}
void ZooKeeper::CloseResponse::readImpl(ReadBuffer &)
{
throw Exception("Received response for close request");
throw Exception("Received response for close request", ZRUNTIMEINCONSISTENCY);
}
ZooKeeper::MultiResponse::MultiResponse(const Requests & requests)
@ -1086,10 +1181,8 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(done, in);
ZooKeeperImpl::read(op_error, in);
// std::cerr << "Received result for multi: " << op_num << "\n";
if (done)
throw Exception("Not enough results received for multi transaction");
throw Exception("Not enough results received for multi transaction", ZMARSHALLINGERROR);
/// op_num == -1 is special for multi transaction.
/// For unknown reason, error code is duplicated in header and in response body.
@ -1123,11 +1216,11 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
ZooKeeperImpl::read(error, in);
if (!done)
throw Exception("Too many results received for multi transaction");
throw Exception("Too many results received for multi transaction", ZMARSHALLINGERROR);
if (op_num != -1)
throw Exception("Unexpected op_num received at the end of results for multi transaction");
throw Exception("Unexpected op_num received at the end of results for multi transaction", ZMARSHALLINGERROR);
if (error != -1)
throw Exception("Unexpected error value received at the end of results for multi transaction");
throw Exception("Unexpected error value received at the end of results for multi transaction", ZMARSHALLINGERROR);
}
}
@ -1136,7 +1229,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired && info.request->xid != close_xid)
throw Exception("Session expired");
throw Exception("Session expired", ZSESSIONEXPIRED);
info.request->addRootPath(root_path);
@ -1146,10 +1239,13 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{
info.request->xid = xid.fetch_add(1);
if (info.request->xid < 0)
throw Exception("XID overflow");
throw Exception("XID overflow", ZSESSIONEXPIRED);
}
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
@ -1157,12 +1253,13 @@ void ZooKeeper::pushRequest(RequestInfo && info)
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (!requests.tryPush(info.request, session_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within session timeout");
if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
}
@ -1186,6 +1283,7 @@ void ZooKeeper::create(
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const CreateResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperCreate);
}
@ -1203,6 +1301,7 @@ void ZooKeeper::remove(
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const RemoveResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperRemove);
}
@ -1220,6 +1319,7 @@ void ZooKeeper::exists(
request_info.watch = watch;
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperExists);
}
@ -1237,6 +1337,7 @@ void ZooKeeper::get(
request_info.watch = watch;
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperGet);
}
@ -1256,6 +1357,7 @@ void ZooKeeper::set(
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const SetResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperSet);
}
@ -1273,6 +1375,7 @@ void ZooKeeper::list(
request_info.watch = watch;
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperList);
}
@ -1290,6 +1393,7 @@ void ZooKeeper::check(
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const CheckResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperCheck);
}
@ -1310,6 +1414,7 @@ void ZooKeeper::multi(
request_info.callback = [callback](const Response & response) { callback(typeid_cast<const MultiResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
}
@ -1322,6 +1427,7 @@ void ZooKeeper::close()
request_info.request = std::make_shared<CloseRequest>(std::move(request));
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
}

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
@ -23,12 +24,38 @@
#include <functional>
namespace CurrentMetrics
{
extern const Metric ZooKeeperSession;
}
namespace ZooKeeperImpl
{
using namespace DB;
class Exception : public DB::Exception
{
private:
/// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
Exception(const std::string & msg, const int32_t code, int);
public:
explicit Exception(const int32_t code);
Exception(const std::string & msg, const int32_t code);
Exception(const int32_t code, const std::string & path);
Exception(const Exception & exc);
const char * name() const throw() override { return "ZooKeeperImpl::Exception"; }
const char * className() const throw() override { return "ZooKeeperImpl::Exception"; }
Exception * clone() const override { return new Exception(*this); }
const int32_t code;
};
/** Usage scenario:
* - create an object and issue commands;
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
@ -543,6 +570,8 @@ private:
template <typename T>
void read(T &);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperSession};
};
};

View File

@ -1,5 +1,6 @@
#include <Common/typeid_cast.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/StringUtils/StringUtils.h>
#include <iostream>
#include <chrono>

View File

@ -1,5 +1,6 @@
#include <iostream>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Poco/ConsoleChannel.h>
#include <Common/Exception.h>

View File

@ -1,4 +1,5 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/typeid_cast.h>
#include <iostream>
#include <port/unistd.h>

View File

@ -9,6 +9,7 @@
#cmakedefine01 USE_RDKAFKA
#cmakedefine01 USE_CAPNP
#cmakedefine01 USE_EMBEDDED_COMPILER
#cmakedefine01 Poco_SQLODBC_FOUND
#cmakedefine01 Poco_DataODBC_FOUND
#cmakedefine01 Poco_MongoDB_FOUND
#cmakedefine01 Poco_NetSSL_FOUND

View File

@ -34,6 +34,7 @@ const char * auto_config_build[]
"USE_VECTORCLASS", "@USE_VECTORCLASS@",
"USE_RDKAFKA", "@USE_RDKAFKA@",
"USE_CAPNP", "@USE_CAPNP@",
"USE_Poco_SQLODBC", "@Poco_SQLODBC_FOUND@",
"USE_Poco_DataODBC", "@Poco_DataODBC_FOUND@",
"USE_Poco_MongoDB", "@Poco_MongoDB_FOUND@",
"USE_Poco_NetSSL", "@Poco_NetSSL_FOUND@",

View File

@ -19,7 +19,7 @@
#if Poco_MongoDB_FOUND
#include <Dictionaries/MongoDBDictionarySource.h>
#endif
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/ODBC/Connector.h>
@ -89,7 +89,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct)
DictionarySourceFactory::DictionarySourceFactory()
: log(&Poco::Logger::get("DictionarySourceFactory"))
{
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
Poco::Data::ODBC::Connector::registerConnector();
#endif
}
@ -154,7 +154,7 @@ DictionarySourcePtr DictionarySourceFactory::create(
}
else if ("odbc" == source_type)
{
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
return std::make_unique<ODBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context);
#else
throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",

View File

@ -1,17 +1,15 @@
#pragma once
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include <Dictionaries/DictionaryStructure.h>
namespace Poco
{
namespace Data
{
class SessionPool;
}
namespace Util
{
class AbstractConfiguration;

View File

@ -515,12 +515,6 @@ void Context::setConfig(const ConfigurationPtr & config)
shared->config = config;
}
ConfigurationPtr Context::getConfig() const
{
auto lock = getLock();
return shared->config;
}
Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
@ -689,10 +683,10 @@ void Context::assertTableExists(const String & database_name, const String & tab
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
if (!it->second->isTableExist(*this, table_name))
throw Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
@ -706,7 +700,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() != it && it->second->isTableExist(*this, table_name))
throw Exception("Table " + db + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
@ -719,7 +713,7 @@ void Context::assertDatabaseExists(const String & database_name, bool check_data
checkDatabaseAccessRights(db);
if (shared->databases.end() == shared->databases.find(db))
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
}
@ -731,7 +725,7 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const
checkDatabaseAccessRights(db);
if (shared->databases.end() != shared->databases.find(db))
throw Exception("Database " + db + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
throw Exception("Database " + backQuoteIfNeed(db) + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
@ -801,7 +795,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
if (shared->databases.end() == it)
{
if (exception)
*exception = Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
*exception = Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
return {};
}
@ -809,7 +803,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
if (!table)
{
if (exception)
*exception = Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
*exception = Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return {};
}
@ -820,7 +814,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast)
{
if (external_tables.end() != external_tables.find(table_name))
throw Exception("Temporary table " + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
external_tables[table_name] = std::pair(storage, ast);
}
@ -926,7 +920,7 @@ ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const
{
TableAndCreateASTs::const_iterator jt = external_tables.find(table_name);
if (external_tables.end() == jt)
throw Exception("Temporary Table" + table_name + " doesn't exist", ErrorCodes::UNKNOWN_TABLE);
throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " doesn't exist", ErrorCodes::UNKNOWN_TABLE);
return jt->second.second;
}
@ -1326,21 +1320,13 @@ DDLWorker & Context::getDDLWorker() const
return *shared->ddl_worker;
}
void Context::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard<std::mutex> lock(shared->zookeeper_mutex);
if (shared->zookeeper)
throw Exception("ZooKeeper client has already been set.", ErrorCodes::LOGICAL_ERROR);
shared->zookeeper = std::move(zookeeper);
}
zkutil::ZooKeeperPtr Context::getZooKeeper() const
{
std::lock_guard<std::mutex> lock(shared->zookeeper_mutex);
if (shared->zookeeper && shared->zookeeper->expired())
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(getConfigRef(), "zookeeper");
else if (shared->zookeeper->expired())
shared->zookeeper = shared->zookeeper->startNewSession();
return shared->zookeeper;

View File

@ -140,7 +140,6 @@ public:
/// Global application configuration settings.
void setConfig(const ConfigurationPtr & config);
ConfigurationPtr getConfig() const;
Poco::Util::AbstractConfiguration & getConfigRef() const;
/** Take the list of users, quotas and configuration profiles from this config.
@ -300,7 +299,6 @@ public:
MergeList & getMergeList();
const MergeList & getMergeList() const;
void setZooKeeper(std::shared_ptr<zkutil::ZooKeeper> zookeeper);
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Has ready or expired ZooKeeper

View File

@ -32,6 +32,7 @@
#include <Columns/ColumnArray.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Lock.h>
#include <Common/isLocalAddress.h>
#include <Poco/Timestamp.h>
@ -858,7 +859,7 @@ void DDLWorker::run()
}
catch (const zkutil::KeeperException & e)
{
if (!e.isHardwareError())
if (!zkutil::isHardwareError(e.code))
throw;
}
}
@ -886,7 +887,7 @@ void DDLWorker::run()
}
catch (zkutil::KeeperException & e)
{
if (e.isHardwareError())
if (zkutil::isHardwareError(e.code))
{
LOG_DEBUG(log, "Recovering ZooKeeper session after: " << getCurrentExceptionMessage(false));

View File

@ -1519,7 +1519,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
}
else
{
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(source_columns, settings);
NamesAndTypesList temp_columns = source_columns;
temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end());
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, settings);
getRootActions(func->arguments->children.at(0), true, false, temp_actions);
Block sample_block_with_calculated_columns = temp_actions->getSampleBlock();

View File

@ -17,10 +17,19 @@
#include <boost/algorithm/string.hpp>
#include <pcg_random.hpp>
#include <common/logger_useful.h>
#include <common/ThreadPool.h>
#include <daemon/OwnPatternFormatter.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/typeid_cast.h>
#include <Common/ClickHouseRevision.h>
#include <Common/formatReadable.h>
#include <Common/escapeForFileName.h>
#include <Client/Connection.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
@ -30,12 +39,6 @@
#include <Interpreters/InterpreterShowCreateQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <common/logger_useful.h>
#include <common/ThreadPool.h>
#include <Common/typeid_cast.h>
#include <Common/ClickHouseRevision.h>
#include <Common/escapeForFileName.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
@ -61,8 +64,6 @@
#include <Storages/StorageDistributed.h>
#include <Databases/DatabaseMemory.h>
#include <Server/StatusFile.h>
#include <Common/formatReadable.h>
#include <daemon/OwnPatternFormatter.h>
namespace DB
@ -715,13 +716,11 @@ class ClusterCopier
{
public:
ClusterCopier(const ConfigurationPtr & zookeeper_config_,
const String & task_path_,
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
Context & context_)
:
zookeeper_config(zookeeper_config_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
@ -732,7 +731,7 @@ public:
void init()
{
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
task_description_watch_callback = [this] (const ZooKeeperImpl::ZooKeeper::WatchResponse &)
{
@ -762,8 +761,8 @@ public:
/// Do not initialize tables, will make deferred initialization in process()
getZooKeeper()->createAncestors(getWorkersPathVersion() + "/");
getZooKeeper()->createAncestors(getWorkersPath() + "/");
zookeeper->createAncestors(getWorkersPathVersion() + "/");
zookeeper->createAncestors(getWorkersPath() + "/");
}
template <typename T>
@ -890,7 +889,7 @@ public:
void reloadTaskDescription()
{
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
task_description_watch_zookeeper = zookeeper;
String task_config_str;
@ -1087,7 +1086,7 @@ protected:
{
LOG_DEBUG(log, "Check that all shards processed partition " << partition_name << " successfully");
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
Strings status_paths;
for (auto & shard : shards_with_partition)
@ -1213,7 +1212,7 @@ protected:
{
cleaner_holder = zkutil::EphemeralNodeHolder::create(dirt_cleaner_path, *zookeeper, host_id);
}
catch (zkutil::KeeperException & e)
catch (const zkutil::KeeperException & e)
{
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
{
@ -1459,7 +1458,7 @@ protected:
TaskTable & task_table = task_shard.task_table;
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
auto zookeeper = getZooKeeper();
auto zookeeper = context.getZooKeeper();
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String current_task_is_active_path = task_partition.getActiveWorkerPath();
@ -1693,7 +1692,7 @@ protected:
status = future_is_dirty_checker->get();
future_is_dirty_checker.reset();
}
catch (zkutil::KeeperException & e)
catch (const zkutil::KeeperException & e)
{
future_is_dirty_checker.reset();
throw;
@ -1995,21 +1994,7 @@ protected:
return successful_shards;
}
zkutil::ZooKeeperPtr getZooKeeper()
{
auto zookeeper = context.getZooKeeper();
if (!zookeeper)
{
context.setZooKeeper(std::make_shared<zkutil::ZooKeeper>(*zookeeper_config, "zookeeper"));
zookeeper = context.getZooKeeper();
}
return zookeeper;
}
private:
ConfigurationPtr zookeeper_config;
String task_zookeeper_path;
String task_description_path;
String host_id;
@ -2152,6 +2137,7 @@ void ClusterCopierApp::mainImpl()
auto context = std::make_unique<Context>(Context::createGlobal());
SCOPE_EXIT(context->shutdown());
context->setConfig(zookeeper_configuration);
context->setGlobalContext(*context);
context->setApplicationType(Context::ApplicationType::LOCAL);
context->setPath(process_path);
@ -2165,8 +2151,7 @@ void ClusterCopierApp::mainImpl()
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
std::unique_ptr<ClusterCopier> copier(new ClusterCopier(
zookeeper_configuration, task_path, host_id, default_database, *context));
std::unique_ptr<ClusterCopier> copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);

View File

@ -102,12 +102,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setGlobalContext(*global_context);
global_context->setApplicationType(Context::ApplicationType::SERVER);
bool has_zookeeper = false;
if (config().has("zookeeper"))
{
global_context->setZooKeeper(std::make_shared<zkutil::ZooKeeper>(config(), "zookeeper"));
has_zookeeper = true;
}
bool has_zookeeper = config().has("zookeeper");
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
if (loaded_config.has_zk_includes)

View File

@ -2189,18 +2189,17 @@ bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &
bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const
{
/// Make sure that the left side of the IN operator is part of the primary key.
/// If there is a tuple on the left side of the IN operator, each item of the tuple must be part of the primary key.
/// Make sure that the left side of the IN operator contain part of the primary key.
/// If there is a tuple on the left side of the IN operator, at least one item of the tuple must be part of the primary key (probably wrapped by a chain of some acceptable functions).
const ASTFunction * left_in_operand_tuple = typeid_cast<const ASTFunction *>(left_in_operand.get());
if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple")
{
for (const auto & item : left_in_operand_tuple->arguments->children)
if (!isPrimaryKeyColumnPossiblyWrappedInFunctions(item))
/// The tuple itself may be part of the primary key, so check that as a last resort.
return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand);
if (isPrimaryKeyColumnPossiblyWrappedInFunctions(item))
return true;
/// tuple() is invalid but can still be found here since this method may be called before the arguments are validated.
return !left_in_operand_tuple->arguments->children.empty();
/// The tuple itself may be part of the primary key, so check that as a last resort.
return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand);
}
else
{

View File

@ -1,5 +1,6 @@
#include <memory>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageReplicatedMergeTree.h>

View File

@ -5,8 +5,10 @@
#include <Interpreters/PartLog.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <IO/Operators.h>
namespace ProfileEvents
{
extern const Event DuplicatedInsertedBlocks;
@ -25,6 +27,7 @@ namespace ErrorCodes
extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT;
extern const int INSERT_WAS_DEDUPLICATED;
extern const int KEEPER_EXCEPTION;
}

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/randomSeed.h>

View File

@ -1,17 +1,13 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
namespace Poco
{
namespace Data
{
class SessionPool;
}
}
namespace DB

View File

@ -23,7 +23,7 @@ void registerStorageJoin(StorageFactory & factory);
void registerStorageView(StorageFactory & factory);
void registerStorageMaterializedView(StorageFactory & factory);
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
void registerStorageODBC(StorageFactory & factory);
#endif
@ -56,7 +56,7 @@ void registerStorages()
registerStorageView(factory);
registerStorageMaterializedView(factory);
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
registerStorageODBC(factory);
#endif

View File

@ -6,7 +6,13 @@ list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFuncti
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions dbms clickhouse_storages_system ${Poco_Foundation_LIBRARY})
if (Poco_SQLODBC_FOUND)
target_link_libraries (clickhouse_table_functions ${Poco_SQLODBC_LIBRARY})
target_include_directories (clickhouse_table_functions PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS})
endif ()
if (Poco_DataODBC_FOUND)
target_link_libraries (clickhouse_table_functions ${Poco_DataODBC_LIBRARY})
target_include_directories (clickhouse_table_functions PRIVATE ${ODBC_INCLUDE_DIRECTORIES})
target_include_directories (clickhouse_table_functions PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_DataODBC_INCLUDE_DIRS})
endif ()

View File

@ -1,6 +1,6 @@
#include <TableFunctions/TableFunctionODBC.h>
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
#include <type_traits>
#include <ext/scope_guard.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <Common/config.h>
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
#include <TableFunctions/ITableFunction.h>

View File

@ -11,7 +11,7 @@ void registerTableFunctionRemote(TableFunctionFactory & factory);
void registerTableFunctionShardByHash(TableFunctionFactory & factory);
void registerTableFunctionNumbers(TableFunctionFactory & factory);
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory);
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
void registerTableFunctionODBC(TableFunctionFactory & factory);
#endif
@ -30,7 +30,7 @@ void registerTableFunctions()
registerTableFunctionNumbers(factory);
registerTableFunctionCatBoostPool(factory);
#if Poco_DataODBC_FOUND
#if Poco_SQLODBC_FOUND || Poco_DataODBC_FOUND
registerTableFunctionODBC(factory);
#endif

View File

@ -0,0 +1,13 @@
all
1 [1]
2 [2]
key, arrayJoin(arr) in (1, 1)
1 1
key, arrayJoin(arr) in ((1, 1), (2, 2))
1 1
2 2
(key, left array join arr) in (1, 1)
1
(key, left array join arr) in ((1, 1), (2, 2))
1
2

View File

@ -0,0 +1,16 @@
create database if not exists test;
drop table if exists test.tab;
create table test.tab (key UInt64, arr Array(UInt64)) Engine = MergeTree order by key;
insert into test.tab values (1, [1]);
insert into test.tab values (2, [2]);
select 'all';
select * from test.tab order by key;
select 'key, arrayJoin(arr) in (1, 1)';
select key, arrayJoin(arr) as val from test.tab where (key, val) in (1, 1);
select 'key, arrayJoin(arr) in ((1, 1), (2, 2))';
select key, arrayJoin(arr) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key;
select '(key, left array join arr) in (1, 1)';
select key from test.tab left array join arr as val where (key, val) in (1, 1);
select '(key, left array join arr) in ((1, 1), (2, 2))';
select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (1.1.54372) unstable; urgency=low
clickhouse (1.1.54373) unstable; urgency=low
* Modified source code
-- <robot-metrika-test@yandex-team.ru> Mon, 02 Apr 2018 22:13:54 +0300
-- <robot-metrika-test@yandex-team.ru> Wed, 04 Apr 2018 00:45:07 +0300

View File

@ -31,7 +31,7 @@ Then run:
```bash
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo apt-get update
sudo apt-get install clickhouse-client clickhouse-server
sudo apt-get install clickhouse-client clickhouse-server-common
```
You can also download and install packages manually from here:

View File

@ -33,6 +33,9 @@ There are libraries for working with ClickHouse for:
- [ClickHouse-Net](https://github.com/killwort/ClickHouse-Net)
- C++
- [clickhouse-cpp](https://github.com/artpaul/clickhouse-cpp/)
- Elixir
- [clickhousex](https://github.com/appodeal/clickhousex/)
- [clickhouse_ecto](https://github.com/appodeal/clickhouse_ecto)
We have not tested these libraries. They are listed in random order.

View File

@ -31,7 +31,7 @@ deb http://repo.yandex.ru/clickhouse/deb/stable/ main/
```bash
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo apt-get update
sudo apt-get install clickhouse-client clickhouse-server
sudo apt-get install clickhouse-client clickhouse-server-common
```
Также можно скачать и установить пакеты вручную, отсюда: <https://repo.yandex.ru/clickhouse/deb/stable/main/>.

View File

@ -33,5 +33,8 @@
- [ClickHouse-Net](https://github.com/killwort/ClickHouse-Net)
- C++
- [clickhouse-cpp](https://github.com/artpaul/clickhouse-cpp/)
- Elixir
- [clickhousex](https://github.com/appodeal/clickhousex/)
- [clickhouse_ecto](https://github.com/appodeal/clickhouse_ecto)
Библиотеки не тестировались нами. Порядок перечисления произвольный.

View File

@ -206,13 +206,13 @@ int main(int argc, char ** argv)
}
}
catch (zkutil::KeeperException & e)
catch (const zkutil::KeeperException & e)
{
std::cerr << "KeeperException: " << e.displayText() << std::endl;
}
}
}
catch (zkutil::KeeperException & e)
catch (const zkutil::KeeperException & e)
{
std::cerr << "KeeperException: " << e.displayText() << std::endl;
return 1;

View File

@ -3,6 +3,7 @@
#include <boost/program_options.hpp>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
int main(int argc, char ** argv)

View File

@ -439,7 +439,7 @@ Then run:
%%
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo apt-get update
sudo apt-get install -y clickhouse-client clickhouse-server
sudo apt-get install clickhouse-client clickhouse-server-common
%%
You can also download and install packages manually from here:
@ -709,7 +709,7 @@ echo &#39;DROP TABLE t&#39; | POST &#39;http://localhost:8123/&#39;
For successful requests that don&#39;t return a data table, an empty response body is returned.
You can use compression when transmitting data. The compressed data has a non-standard format, and you will need to use a special clickhouse-compressor program to work with it (%%sudo apt-get install clickhouse-utils%%).
You can use compression when transmitting data. The compressed data has a non-standard format, and you will need to use a special compressor program to work with it (%%sudo apt-get install clickhouse-compressor%%).
If you specified &#39;compress=1&#39; in the URL, the server will compress the data it sends you.
If you specified &#39;decompress=1&#39; in the URL, the server will decompress the same data that you pass in the POST method.

View File

@ -449,7 +449,7 @@ deb http://repo.yandex.ru/clickhouse/trusty stable main
%%
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo apt-get update
sudo apt-get install -y clickhouse-client clickhouse-server
sudo apt-get install clickhouse-client clickhouse-server-common
%%
Также можно скачать и установить пакеты вручную, отсюда:
@ -725,7 +725,7 @@ echo 'DROP TABLE t' | POST 'http://localhost:8123/'
Для запросов, которые не возвращают таблицу с данными, в случае успеха, выдаётся пустое тело ответа.
Вы можете использовать сжатие при передаче данных. Формат сжатых данных нестандартный, и вам придётся использовать для работы с ним специальную программу clickhouse-compressor (%%sudo apt-get install clickhouse-utils%%).
Вы можете использовать сжатие при передаче данных. Формат сжатых данных нестандартный, и вам придётся использовать для работы с ним специальную программу compressor (%%sudo apt-get install clickhouse-compressor%%).
Если вы указали в URL compress=1, то сервер будет сжимать отправляемые вам данные.
Если вы указали в URL decompress=1, то сервер будет разжимать те данные, которые вы передаёте ему POST-ом.

View File

@ -393,7 +393,7 @@ sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 # optional
sudo apt-add-repository "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
sudo apt-get install clickhouse-server-common clickhouse-client -y
sudo service clickhouse-server start
clickhouse-client

View File

@ -51,7 +51,7 @@
<p><b>clickhouse-client</b> package contains <a
href="docs/en/interfaces/cli/">clickhouse-client</a> application —
interactive ClickHouse client. <b>clickhouse-common</b> contains a clickhouse-server binary file. <b>clickhouse-server</b>
interactive ClickHouse client. <b>clickhouse-server-base</b> contains a clickhouse-server binary file. <b>clickhouse-server-common</b>
— contains config files for the clickhouse-server.</p>
<p>Server config files are located in /etc/clickhouse-server/. Before getting to work please notice the <b>path</b>