Merge branch 'master' of github.com:yandex/ClickHouse into pyos-llvm-jit

This commit is contained in:
Alexey Milovidov 2018-05-09 23:34:20 +03:00
commit 3a059b82d2
64 changed files with 529 additions and 297 deletions

2
.gitignore vendored
View File

@ -9,7 +9,7 @@
# auto generated files
*.logrt
build
/build
/docs/en_single_page/
/docs/ru_single_page/
/docs/venv/

3
.gitmodules vendored
View File

@ -34,3 +34,6 @@
[submodule "contrib/boost"]
path = contrib/boost
url = https://github.com/ClickHouse-Extras/boost.git
[submodule "contrib/llvm"]
path = contrib/llvm
url = https://github.com/ClickHouse-Extras/llvm

View File

@ -22,6 +22,10 @@ if (NOT MSVC)
set (NOT_MSVC 1)
endif ()
if (NOT APPLE)
set (NOT_APPLE 1)
endif ()
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (COMPILER_GCC 1)
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")

View File

@ -1,26 +1,41 @@
option (ENABLE_EMBEDDED_COMPILER "Set to TRUE to enable support for 'compile' option for query execution" 1)
option (USE_INTERNAL_LLVM_LIBRARY "Use bundled or system LLVM library. Default: system library for quicker developer builds." 0)
if (ENABLE_EMBEDDED_COMPILER)
set (LLVM_PATHS "/usr/local/lib/llvm")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
find_package(LLVM CONFIG PATHS ${LLVM_PATHS})
else ()
find_package(LLVM 5 CONFIG PATHS ${LLVM_PATHS})
if (USE_INTERNAL_LLVM_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm/CMakeLists.txt")
message (WARNING "submodule contrib/llvm is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LLVM_LIBRARY 0)
endif ()
if (LLVM_FOUND)
# Remove dynamically-linked zlib and libedit from LLVM's dependencies:
set_target_properties(LLVMSupport PROPERTIES INTERFACE_LINK_LIBRARIES "-lpthread;LLVMDemangle")
set_target_properties(LLVMLineEditor PROPERTIES INTERFACE_LINK_LIBRARIES "LLVMSupport")
if (NOT USE_INTERNAL_LZ4_LIBRARY)
set (LLVM_PATHS "/usr/local/lib/llvm")
message(STATUS "LLVM version: ${LLVM_PACKAGE_VERSION}")
message(STATUS "LLVM Include Directory: ${LLVM_INCLUDE_DIRS}")
message(STATUS "LLVM Library Directory: ${LLVM_LIBRARY_DIRS}")
message(STATUS "LLVM C++ Compiler: ${LLVM_CXXFLAGS}")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
find_package(LLVM CONFIG PATHS ${LLVM_PATHS})
else ()
find_package(LLVM 5 CONFIG PATHS ${LLVM_PATHS})
endif ()
option(LLVM_HAS_RTTI "Enable if LLVM was build with RTTI enabled" ON)
if (LLVM_FOUND)
# Remove dynamically-linked zlib and libedit from LLVM's dependencies:
set_target_properties(LLVMSupport PROPERTIES INTERFACE_LINK_LIBRARIES "-lpthread;LLVMDemangle")
set_target_properties(LLVMLineEditor PROPERTIES INTERFACE_LINK_LIBRARIES "LLVMSupport")
option(LLVM_HAS_RTTI "Enable if LLVM was build with RTTI enabled" ON)
set (USE_EMBEDDED_COMPILER 1)
endif()
else()
set (LLVM_FOUND 1)
set (USE_EMBEDDED_COMPILER 1)
set (LLVM_VERSION "7.0.0bundled")
set (LLVM_INCLUDE_DIRS ${ClickHouse_SOURCE_DIR}/contrib/llvm/llvm/include ${ClickHouse_BINARY_DIR}/contrib/llvm/llvm/include)
set (LLVM_LIBRARY_DIRS ${ClickHouse_BINARY_DIR}/contrib/llvm/llvm)
endif()
if (LLVM_FOUND)
message(STATUS "LLVM version: ${LLVM_PACKAGE_VERSION}")
message(STATUS "LLVM include Directory: ${LLVM_INCLUDE_DIRS}")
message(STATUS "LLVM library Directory: ${LLVM_LIBRARY_DIRS}")
message(STATUS "LLVM C++ compiler flags: ${LLVM_CXXFLAGS}")
endif()
endif()

View File

@ -6,10 +6,3 @@ else ()
endif ()
message(STATUS "Using rt: ${RT_LIBRARY}")
function (target_link_rt_by_force TARGET)
if (NOT APPLE)
set (FLAGS "-Wl,-no-as-needed -lrt -Wl,-as-needed")
set_property (TARGET ${TARGET} APPEND PROPERTY LINK_FLAGS "${FLAGS}")
endif ()
endfunction ()

View File

@ -150,3 +150,7 @@ if (USE_INTERNAL_POCO_LIBRARY)
target_include_directories(Crypto PUBLIC ${OPENSSL_INCLUDE_DIR})
endif ()
endif ()
if (USE_INTERNAL_LLVM_LIBRARY)
add_subdirectory (llvm/llvm)
endif ()

View File

@ -1088,7 +1088,7 @@ class sparsegroup {
// This is equivalent to memmove(), but faster on my Intel P4,
// at least with gcc4.1 -O2 / glibc 2.3.6.
for (size_type i = settings.num_buckets; i > offset; --i)
memcpy(group + i, group + i-1, sizeof(*group));
memcpy(static_cast<void*>(group + i), group + i-1, sizeof(*group));
}
// Create space at group[offset], without special assumptions about value_type
@ -1154,7 +1154,7 @@ class sparsegroup {
// at lesat with gcc4.1 -O2 / glibc 2.3.6.
assert(settings.num_buckets > 0);
for (size_type i = offset; i < settings.num_buckets-1; ++i)
memcpy(group + i, group + i+1, sizeof(*group)); // hopefully inlined!
memcpy(static_cast<void*>(group + i), group + i+1, sizeof(*group)); // hopefully inlined!
group = settings.realloc_or_die(group, settings.num_buckets-1);
}

1
contrib/llvm vendored Submodule

@ -0,0 +1 @@
Subproject commit 6b3975cf38d5c9436e1311b7e54ad93ef1a9aa9c

View File

@ -17,6 +17,7 @@
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/CurrentMetrics.h>
#include <Common/DNSResolver.h>
#include <Interpreters/ClientInfo.h>
#include <Common/config.h>
@ -66,7 +67,10 @@ void Connection::connect()
{
socket = std::make_unique<Poco::Net::StreamSocket>();
}
socket->connect(resolved_address, timeouts.connection_timeout);
current_resolved_address = DNSResolver::instance().resolveAddress(host, port);
socket->connect(current_resolved_address, timeouts.connection_timeout);
socket->setReceiveTimeout(timeouts.receive_timeout);
socket->setSendTimeout(timeouts.send_timeout);
socket->setNoDelay(true);
@ -462,6 +466,14 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
Poco::Net::SocketAddress Connection::getResolvedAddress() const
{
if (connected)
return current_resolved_address;
return DNSResolver::instance().resolveAddress(host, port);
}
bool Connection::poll(size_t timeout_microseconds)
{
@ -571,6 +583,7 @@ void Connection::initBlockInput()
void Connection::setDescription()
{
auto resolved_address = getResolvedAddress();
description = host + ":" + toString(resolved_address.port());
auto ip_address = resolved_address.host().toString();
@ -610,7 +623,7 @@ void Connection::fillBlockExtraInfo(BlockExtraInfo & info) const
{
info.is_valid = true;
info.host = host;
info.resolved_address = resolved_address.toString();
info.resolved_address = getResolvedAddress().toString();
info.port = port;
info.user = user;
}

View File

@ -53,32 +53,7 @@ class Connection : private boost::noncopyable
friend class MultiplexedConnections;
public:
Connection(const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_,
const ConnectionTimeouts & timeouts_,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable,
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), resolved_address(host, port),
client_name(client_name_),
compression(compression_),
secure(secure_),
timeouts(timeouts_),
sync_request_timeout(sync_request_timeout_),
log_wrapper(*this)
{
/// Don't connect immediately, only on first need.
if (user.empty())
user = "default";
setDescription();
}
Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const ConnectionTimeouts & timeouts_,
@ -87,10 +62,8 @@ public:
Protocol::Secure secure_ = Protocol::Secure::Disable,
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
:
host(host_), port(port_),
default_database(default_database_),
user(user_), password(password_),
resolved_address(resolved_address_),
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), current_resolved_address(host, port),
client_name(client_name_),
compression(compression_),
secure(secure_),
@ -189,6 +162,9 @@ public:
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const;
private:
String host;
UInt16 port;
@ -196,10 +172,9 @@ private:
String user;
String password;
/** Address could be resolved beforehand and passed to constructor. Then 'host' and 'port' fields are used just for logging.
* Otherwise address is resolved in constructor. Thus, DNS based load balancing is not supported.
*/
Poco::Net::SocketAddress resolved_address;
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes
Poco::Net::SocketAddress current_resolved_address;
/// For messages in log and in exceptions.
String description;

View File

@ -54,24 +54,7 @@ public:
Protocol::Secure secure_ = Protocol::Secure::Disable)
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), resolved_address(host_, port_),
client_name(client_name_), compression(compression_),
secure{secure_},
timeouts(timeouts)
{
}
ConnectionPool(unsigned max_connections_,
const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
const String & default_database_,
const String & user_, const String & password_,
const ConnectionTimeouts & timeouts,
const String & client_name_ = "client",
Protocol::Compression compression_ = Protocol::Compression::Enable,
Protocol::Secure secure_ = Protocol::Secure::Disable)
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), resolved_address(resolved_address_),
user(user_), password(password_),
client_name(client_name_), compression(compression_),
secure{secure_},
timeouts(timeouts)
@ -102,7 +85,7 @@ protected:
ConnectionPtr allocObject() override
{
return std::make_shared<Connection>(
host, port, resolved_address,
host, port,
default_database, user, password, timeouts,
client_name, compression, secure);
}
@ -114,11 +97,6 @@ private:
String user;
String password;
/** The address can be resolved in advance and passed to the constructor. Then `host` and `port` fields are meaningful only for logging.
* Otherwise, address is resolved in constructor. That is, DNS balancing is not supported.
*/
Poco::Net::SocketAddress resolved_address;
String client_name;
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.

View File

@ -15,6 +15,7 @@
#include <linux/aio_abi.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <errno.h>
/** Small wrappers for asynchronous I/O.

View File

@ -1,40 +0,0 @@
#pragma once
#include <Poco/Net/IPAddress.h>
#include <Poco/Net/SocketAddress.h>
#include <memory>
#include <ext/singleton.h>
namespace DB
{
/// A singleton implementing global and permanent DNS cache
/// It could be updated only manually via drop() method
class DNSCache : public ext::singleton<DNSCache>
{
public:
DNSCache(const DNSCache &) = delete;
/// Accepts host names like 'example.com' or '127.0.0.1' or '::1' and resolve its IP
Poco::Net::IPAddress resolveHost(const std::string & host);
/// Accepts host names like 'example.com:port' or '127.0.0.1:port' or '[::1]:port' and resolve its IP and port
Poco::Net::SocketAddress resolveHostAndPort(const std::string & host_and_port);
/// Drops all caches
void drop();
~DNSCache();
protected:
DNSCache();
friend class ext::singleton<DNSCache>;
struct Impl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -1,4 +1,4 @@
#include "DNSCache.h"
#include "DNSResolver.h"
#include <Common/SimpleCache.h>
#include <Common/Exception.h>
#include <Core/Types.h>
@ -6,6 +6,7 @@
#include <Poco/Net/NetException.h>
#include <Poco/NumberParser.h>
#include <arpa/inet.h>
#include <atomic>
namespace DB
@ -74,34 +75,47 @@ static Poco::Net::IPAddress resolveIPAddressImpl(const std::string & host)
}
struct DNSCache::Impl
struct DNSResolver::Impl
{
SimpleCache<decltype(resolveIPAddressImpl), &resolveIPAddressImpl> cache_host;
/// If disabled, will not make cache lookups, will resolve addresses manually on each call
std::atomic<bool> disable_cache{false};
};
DNSCache::DNSCache() : impl(std::make_unique<DNSCache::Impl>()) {}
DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()) {}
Poco::Net::IPAddress DNSCache::resolveHost(const std::string & host)
Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
{
return impl->cache_host(host);
return !impl->disable_cache ? impl->cache_host(host) : resolveIPAddressImpl(host);
}
Poco::Net::SocketAddress DNSCache::resolveHostAndPort(const std::string & host_and_port)
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_and_port)
{
String host;
UInt16 port;
splitHostAndPort(host_and_port, host, port);
return Poco::Net::SocketAddress(impl->cache_host(host), port);
return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host_and_port);
}
void DNSCache::drop()
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
{
return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host, port);
}
void DNSResolver::dropCache()
{
impl->cache_host.drop();
}
DNSCache::~DNSCache() = default;
void DNSResolver::setDisableCacheFlag(bool is_disabled)
{
impl->disable_cache = is_disabled;
}
DNSResolver::~DNSResolver() = default;
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <Poco/Net/IPAddress.h>
#include <Poco/Net/SocketAddress.h>
#include <memory>
#include <ext/singleton.h>
#include <Core/Types.h>
namespace DB
{
/// A singleton implementing DNS names resolving with optional permanent DNS cache
/// The cache could be updated only manually via drop() method
class DNSResolver : public ext::singleton<DNSResolver>
{
public:
DNSResolver(const DNSResolver &) = delete;
/// Accepts host names like 'example.com' or '127.0.0.1' or '::1' and resolve its IP
Poco::Net::IPAddress resolveHost(const std::string & host);
/// Accepts host names like 'example.com:port' or '127.0.0.1:port' or '[::1]:port' and resolve its IP and port
Poco::Net::SocketAddress resolveAddress(const std::string & host_and_port);
Poco::Net::SocketAddress resolveAddress(const std::string & host, UInt16 port);
/// Disables caching
void setDisableCacheFlag(bool is_disabled = true);
/// Drops all caches
void dropCache();
~DNSResolver();
protected:
DNSResolver();
friend class ext::singleton<DNSResolver>;
struct Impl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -1,4 +1,3 @@
#include <errno.h>
#include <string.h>
#include <cxxabi.h>

View File

@ -408,7 +408,7 @@ protected:
/// Copy to a new location and zero the old one.
x.setHash(hash_value);
memcpy(&buf[place_value], &x, sizeof(x));
memcpy(static_cast<void*>(&buf[place_value]), &x, sizeof(x));
x.setZero();
/// Then the elements that previously were in collision with this can move to the old place.
@ -726,7 +726,7 @@ public:
{
size_t place_value = findEmptyCell(grower.place(hash_value));
memcpy(&buf[place_value], cell, sizeof(*cell));
memcpy(static_cast<void*>(&buf[place_value]), cell, sizeof(*cell));
++m_size;
if (unlikely(grower.overflow(m_size)))
@ -897,7 +897,7 @@ public:
this->clearHasZero();
m_size = 0;
memset(buf, 0, grower.bufSize() * sizeof(*buf));
memset(static_cast<void*>(buf), 0, grower.bufSize() * sizeof(*buf));
}
/// After executing this function, the table can only be destroyed,

View File

@ -140,7 +140,9 @@
M(RWLockAcquiredReadLocks) \
M(RWLockAcquiredWriteLocks) \
M(RWLockReadersWaitMilliseconds) \
M(RWLockWritersWaitMilliseconds)
M(RWLockWritersWaitMilliseconds) \
\
M(NetworkErrors)
namespace ProfileEvents
{

View File

@ -86,8 +86,8 @@ public:
if (*needle < 0x80u)
{
first_needle_symbol_is_ascii = true;
l = static_cast<const UInt8>(std::tolower(*needle));
u = static_cast<const UInt8>(std::toupper(*needle));
l = std::tolower(*needle);
u = std::toupper(*needle);
}
else
{

View File

@ -10,7 +10,7 @@
namespace DB
{
bool isLocalAddress(const Poco::Net::SocketAddress & address)
bool isLocalAddress(const Poco::Net::IPAddress & address)
{
static auto interfaces = Poco::Net::NetworkInterface::list();
@ -21,14 +21,14 @@ bool isLocalAddress(const Poco::Net::SocketAddress & address)
* Theoretically, this may not be correct - depends on `route` setting
* - through which interface we will actually access the specified address.
*/
return interface.address().length() == address.host().length()
&& 0 == memcmp(interface.address().addr(), address.host().addr(), address.host().length());
return interface.address().length() == address.length()
&& 0 == memcmp(interface.address().addr(), address.addr(), address.length());
});
}
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port)
{
return clickhouse_port == address.port() && isLocalAddress(address);
return clickhouse_port == address.port() && isLocalAddress(address.host());
}

View File

@ -1,6 +1,8 @@
#pragma once
#include <common/Types.h>
#include <Poco/Net/IPAddress.h>
namespace Poco
{
@ -20,10 +22,12 @@ namespace DB
* - only the first address is taken for each network interface;
* - the routing rules that affect which network interface we go to the specified address are not checked.
*/
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
bool isLocalAddress(const Poco::Net::SocketAddress & address);
bool isLocalAddress(const Poco::Net::IPAddress & address);
/// Returns number of different bytes in hostnames, used for load balancing
size_t getHostNameDifference(const std::string & local_hostname, const std::string & host);
}

View File

@ -5,6 +5,7 @@
#include <Poco/File.h>
#include <Common/Exception.h>
#include <port/unistd.h>
#include <errno.h>
namespace DB

View File

@ -150,7 +150,7 @@ private:
--ptr->refcount;
if (!ptr->refcount)
{
if (std::uncaught_exception())
if (std::uncaught_exceptions())
delete ptr;
else
ptr->output_blocks->push_back(ptr);

View File

@ -18,7 +18,7 @@
#include <Interpreters/ExpressionActions.h>
#include <ext/range.h>
#include <common/intExp.h>
#include <boost/math/common_factor.hpp>
#include <boost/integer/common_factor.hpp>
#if USE_EMBEDDED_COMPILER
#pragma GCC diagnostic push
@ -620,7 +620,7 @@ struct GCDImpl
{
throwIfDivisionLeadsToFPE(typename NumberTraits::ToInteger<A>::Type(a), typename NumberTraits::ToInteger<B>::Type(b));
throwIfDivisionLeadsToFPE(typename NumberTraits::ToInteger<B>::Type(b), typename NumberTraits::ToInteger<A>::Type(a));
return boost::math::gcd(
return boost::integer::gcd(
typename NumberTraits::ToInteger<Result>::Type(a),
typename NumberTraits::ToInteger<Result>::Type(b));
}
@ -640,7 +640,7 @@ struct LCMImpl
{
throwIfDivisionLeadsToFPE(typename NumberTraits::ToInteger<A>::Type(a), typename NumberTraits::ToInteger<B>::Type(b));
throwIfDivisionLeadsToFPE(typename NumberTraits::ToInteger<B>::Type(b), typename NumberTraits::ToInteger<A>::Type(a));
return boost::math::lcm(
return boost::integer::lcm(
typename NumberTraits::ToInteger<Result>::Type(a),
typename NumberTraits::ToInteger<Result>::Type(b));
}

View File

@ -7,6 +7,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <optional>

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/ProfileEvents.h>
#include <errno.h>
namespace ProfileEvents

View File

@ -4,7 +4,7 @@
#include <Common/config.h>
#include <Core/Types.h>
#include <IO/ReadBufferFromIStream.h>
#include <Common/DNSCache.h>
#include <Common/DNSResolver.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Version.h>
@ -43,7 +43,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
new Poco::Net::HTTPClientSession)
}
{
session->setHost(DNSCache::instance().resolveHost(uri.getHost()).toString());
session->setHost(DNSResolver::instance().resolveHost(uri.getHost()).toString());
session->setPort(uri.getPort());
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000

View File

@ -6,6 +6,7 @@
#include <limits>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
namespace ProfileEvents

View File

@ -1,5 +1,6 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <Common/ProfileEvents.h>

View File

@ -1,5 +1,5 @@
#include <Interpreters/Cluster.h>
#include <Common/DNSCache.h>
#include <Common/DNSResolver.h>
#include <Common/escapeForFileName.h>
#include <Common/isLocalAddress.h>
#include <Common/SimpleCache.h>
@ -29,7 +29,7 @@ namespace
/// Default shard weight.
static constexpr UInt32 default_weight = 1;
inline bool isLocal(const Cluster::Address & address, UInt16 clickhouse_port)
inline bool isLocal(const Cluster::Address & address, const Poco::Net::SocketAddress & resolved_address, UInt16 clickhouse_port)
{
/// If there is replica, for which:
/// - its port is the same that the server is listening;
@ -41,13 +41,7 @@ inline bool isLocal(const Cluster::Address & address, UInt16 clickhouse_port)
/// Also, replica is considered non-local, if it has default database set
/// (only reason is to avoid query rewrite).
return address.default_database.empty() && isLocalAddress(address.resolved_address, clickhouse_port);
}
Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
{
return Poco::Net::SocketAddress(DNSCache::instance().resolveHost(host), port);
return address.default_database.empty() && isLocalAddress(resolved_address, clickhouse_port);
}
}
@ -56,15 +50,15 @@ Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
UInt16 clickhouse_port = config.getInt("tcp_port", 0);
UInt16 clickhouse_port = static_cast<UInt16>(config.getInt("tcp_port", 0));
host_name = config.getString(config_prefix + ".host");
port = static_cast<UInt16>(config.getInt(config_prefix + ".port"));
resolved_address = resolveSocketAddress(host_name, port);
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
default_database = config.getString(config_prefix + ".default_database", "");
is_local = isLocal(*this, clickhouse_port);
initially_resolved_address = DNSResolver::instance().resolveAddress(host_name, port);
is_local = isLocal(*this, initially_resolved_address, clickhouse_port);
secure = config.getBool(config_prefix + ".secure", false) ? Protocol::Secure::Enable : Protocol::Secure::Disable;
compression = config.getBool(config_prefix + ".compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
}
@ -74,11 +68,11 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
: user(user_), password(password_)
{
auto parsed_host_port = parseAddress(host_port_, clickhouse_port);
resolved_address = resolveSocketAddress(parsed_host_port.first, parsed_host_port.second);
host_name = parsed_host_port.first;
port = parsed_host_port.second;
is_local = isLocal(*this, clickhouse_port);
initially_resolved_address = DNSResolver::instance().resolveAddress(parsed_host_port.first, parsed_host_port.second);
is_local = isLocal(*this, initially_resolved_address, clickhouse_port);
}
@ -94,7 +88,16 @@ String Cluster::Address::toString(const String & host_name, UInt16 port)
String Cluster::Address::readableString() const
{
return host_name + ':' + DB::toString(port);
String res;
/// If it looks like IPv6 address add braces to avoid ambiguity in ipv6_host:port notation
if (host_name.find_first_of(':') != std::string::npos && !host_name.empty() && host_name.back() != ']')
res += '[' + host_name + ']';
else
res += host_name;
res += ':' + DB::toString(port);
return res;
}
void Cluster::Address::fromString(const String & host_port_string, String & host_name, UInt16 & port)
@ -113,8 +116,8 @@ String Cluster::Address::toStringFull() const
return
escapeForFileName(user) +
(password.empty() ? "" : (':' + escapeForFileName(password))) + '@' +
escapeForFileName(resolved_address.host().toString()) + ':' +
std::to_string(resolved_address.port()) +
escapeForFileName(host_name) + ':' +
std::to_string(port) +
(default_database.empty() ? "" : ('#' + escapeForFileName(default_database)))
+ ((secure == Protocol::Secure::Enable) ? "+secure" : "");
}
@ -220,7 +223,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
{
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address,
address.host_name, address.port,
address.default_database, address.user, address.password,
ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings).getSaturated(settings.max_execution_time),
"server", address.compression, address.secure);
@ -252,7 +255,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
/// in case of internal_replication we will be appending names to dir_name_for_internal_replication
/// In case of internal_replication we will be appending names to dir_name_for_internal_replication
std::string dir_name_for_internal_replication;
auto first = true;
@ -303,7 +306,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
{
auto replica_pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
replica.host_name, replica.port, replica.resolved_address,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure);
@ -367,7 +370,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
{
auto replica_pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
replica.host_name, replica.port, replica.resolved_address,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure);

View File

@ -52,13 +52,15 @@ public:
* </replica>
* </shard>
*/
Poco::Net::SocketAddress resolved_address;
String host_name;
UInt16 port;
String user;
String password;
String default_database; /// this database is selected when no database is specified for Distributed table
/// This database is selected when no database is specified for Distributed table
String default_database;
UInt32 replica_num;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
bool is_local;
Protocol::Compression compression = Protocol::Compression::Enable;
Protocol::Secure secure = Protocol::Secure::Disable;
@ -79,6 +81,15 @@ public:
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
String toStringFull() const;
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const
{
return initially_resolved_address;
}
private:
Poco::Net::SocketAddress initially_resolved_address;
};
using Addresses = std::vector<Address>;

View File

@ -14,7 +14,7 @@ BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
{
BlockExtraInfo block_extra_info;
block_extra_info.host = address.host_name;
block_extra_info.resolved_address = address.resolved_address.toString();
block_extra_info.resolved_address = address.getResolvedAddress().toString();
block_extra_info.port = address.port;
block_extra_info.user = address.user;
block_extra_info.is_valid = true;

View File

@ -39,7 +39,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/Context.h>
#include <Common/DNSCache.h>
#include <Common/DNSResolver.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <Parsers/ASTCreateQuery.h>
@ -1406,9 +1406,28 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
void Context::reloadClusterConfig()
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
shared->clusters = std::make_unique<Clusters>(config, settings);
while (true)
{
ConfigurationPtr cluster_config;
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
cluster_config = shared->clusters_config;
}
auto & config = cluster_config ? *cluster_config : getConfigRef();
auto new_clusters = std::make_unique<Clusters>(config, settings);
{
std::lock_guard<std::mutex> lock(shared->clusters_mutex);
if (shared->clusters_config.get() == cluster_config.get())
{
shared->clusters = std::move(new_clusters);
return;
}
/// Clusters config has been suddenly changed, recompute clusters
}
}
}

View File

@ -16,7 +16,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Common/DNSCache.h>
#include <Common/DNSResolver.h>
#include <Common/Macros.h>
#include <Common/getFQDNOrHostName.h>
@ -39,6 +39,7 @@
#include <random>
#include <pcg_random.hpp>
#include <Poco/Net/NetException.h>
namespace DB
@ -93,9 +94,9 @@ struct HostID
{
try
{
return DB::isLocalAddress(Poco::Net::SocketAddress(host_name, port), clickhouse_port);
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
}
catch (const Poco::Exception & e)
catch (const Poco::Net::NetException & e)
{
/// Avoid "Host not found" exceptions
return false;
@ -480,7 +481,7 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (isLocalAddress(address.resolved_address, context.getTCPPort()))
if (isLocalAddress(address.getResolvedAddress(), context.getTCPPort()))
{
if (found_via_resolving)
{
@ -634,43 +635,51 @@ void DDLWorker::processTaskAlter(
if (execute_once_on_replica && !config_is_replicated_shard)
{
throw Exception("Table " + ast_alter->table + " is replicated, but shard #" + toString(task.host_shard_num + 1) +
" isn't replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
" isn't replicated according to its cluster definition."
" Possibly <internal_replication>true</internal_replication> is forgotten in the cluster config.",
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}
else if (!execute_once_on_replica && config_is_replicated_shard)
if (!execute_once_on_replica && config_is_replicated_shard)
{
throw Exception("Table " + ast_alter->table + " isn't replicated, but shard #" + toString(task.host_shard_num + 1) +
" is replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}
if (execute_once_on_replica)
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
/// Where replica_name is 'replica_config_host_name:replica_port'
auto get_shard_name = [] (const Cluster::Addresses & shard_addresses)
{
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
/// Where replica_name is 'escape(replica_ip_address):replica_port'
/// FIXME: this replica_name could be changed after replica restart
Strings replica_names;
for (const Cluster::Address & address : task.cluster->getShardsAddresses().at(task.host_shard_num))
replica_names.emplace_back(address.resolved_address.host().toString());
for (const Cluster::Address & address : shard_addresses)
replica_names.emplace_back(address.readableString());
std::sort(replica_names.begin(), replica_names.end());
String shard_node_name;
String res;
for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
shard_node_name += *it + (std::next(it) != replica_names.end() ? "," : "");
res += *it + (std::next(it) != replica_names.end() ? "," : "");
return res;
};
if (execute_once_on_replica)
{
String shard_node_name = get_shard_name(task.cluster->getShardsAddresses().at(task.host_shard_num));
String shard_path = node_path + "/shards/" + shard_node_name;
String is_executed_path = shard_path + "/executed";
zookeeper->createAncestors(shard_path + "/");
bool alter_executed_by_any_replica = false;
bool is_executed_by_any_replica = false;
{
auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
pcg64 rng(randomSeed());
for (int num_tries = 0; num_tries < 10; ++num_tries)
static const size_t max_tries = 20;
for (size_t num_tries = 0; num_tries < max_tries; ++num_tries)
{
if (zookeeper->exists(is_executed_path))
{
alter_executed_by_any_replica = true;
is_executed_by_any_replica = true;
break;
}
@ -685,16 +694,19 @@ void DDLWorker::processTaskAlter(
zookeeper->create(is_executed_path, task.host_id_str, zkutil::CreateMode::Persistent);
lock->unlock();
alter_executed_by_any_replica = true;
is_executed_by_any_replica = true;
break;
}
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, 1)(rng)));
std::this_thread::sleep_for(std::chrono::milliseconds(std::uniform_int_distribution<long>(0, 1000)(rng)));
}
}
if (!alter_executed_by_any_replica)
task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot enqueue replicated DDL query");
if (!is_executed_by_any_replica)
{
task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED,
"Cannot enqueue replicated DDL query for a replicated shard");
}
}
else
{

View File

@ -0,0 +1,118 @@
#include "DNSCacheUpdater.h"
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/ProfileEvents.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
namespace ProfileEvents
{
extern Event NetworkErrors;
}
namespace DB
{
using BackgroundProcessingPoolTaskInfo = BackgroundProcessingPool::TaskInfo;
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
/// Call it inside catch section
/// Returns true if it is a network error
static bool isNetworkError()
{
try
{
throw;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TIMEOUT_EXCEEDED || e.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
return true;
}
catch (Poco::Net::DNSException & e)
{
return true;
}
catch (Poco::TimeoutException & e)
{
return true;
}
catch (...)
{
/// Do nothing
}
return false;
}
DNSCacheUpdater::DNSCacheUpdater(Context & context_)
: context(context_), pool(context_.getBackgroundPool())
{
task_handle = pool.addTask([this] () { return run(); });
}
bool DNSCacheUpdater::run()
{
/// TODO: Ensusre that we get global counter (not thread local)
auto num_current_network_exceptions = ProfileEvents::counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache
&& time(nullptr) > last_update_time + min_update_period_seconds)
{
try
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "Updating DNS cache");
DNSResolver::instance().dropCache();
context.reloadClusterConfig();
last_num_network_erros = num_current_network_exceptions;
last_update_time = time(nullptr);
return true;
}
catch (...)
{
/// Do not increment ProfileEvents::NetworkErrors twice
if (isNetworkError())
return false;
throw;
}
}
/// According to BackgroundProcessingPool logic, if task has done work, it could be executed again immediately.
return false;
}
DNSCacheUpdater::~DNSCacheUpdater()
{
if (task_handle)
pool.removeTask(task_handle);
task_handle.reset();
}
bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded()
{
if (isNetworkError())
{
ProfileEvents::increment(ProfileEvents::NetworkErrors);
return true;
}
return false;
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <memory>
namespace DB
{
class Context;
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased
class DNSCacheUpdater
{
public:
explicit DNSCacheUpdater(Context & context);
~DNSCacheUpdater();
/// Checks if it is a network error and increments ProfileEvents::NetworkErrors
static bool incrementNetworkErrorEventsIfNeeded();
private:
bool run();
Context & context;
BackgroundProcessingPool & pool;
std::shared_ptr<BackgroundProcessingPoolTaskInfo> task_handle;
size_t last_num_network_erros = 0;
time_t last_update_time = 0;
static constexpr size_t min_errors_to_update_cache = 3;
static constexpr time_t min_update_period_seconds = 45;
};
}

View File

@ -1,5 +1,5 @@
#include <Interpreters/InterpreterSystemQuery.h>
#include <Common/DNSCache.h>
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/EmbeddedDictionaries.h>
@ -63,6 +63,10 @@ BlockIO InterpreterSystemQuery::execute()
using Type = ASTSystemQuery::Type;
/// Use global context with fresh system profile settings
Context system_context = context.getGlobalContext();
system_context.setSetting("profile", context.getSystemProfileName());
switch (query.type)
{
case Type::SHUTDOWN:
@ -74,31 +78,34 @@ BlockIO InterpreterSystemQuery::execute()
throwFromErrno("System call kill(0, SIGKILL) failed", ErrorCodes::CANNOT_KILL);
break;
case Type::DROP_DNS_CACHE:
DNSCache::instance().drop();
DNSResolver::instance().dropCache();
/// Reinitialize clusters to update their resolved_addresses
context.reloadClusterConfig();
system_context.reloadClusterConfig();
break;
case Type::DROP_MARK_CACHE:
context.dropMarkCache();
system_context.dropMarkCache();
break;
case Type::DROP_UNCOMPRESSED_CACHE:
context.dropUncompressedCache();
system_context.dropUncompressedCache();
break;
case Type::RELOAD_DICTIONARY:
context.getExternalDictionaries().reloadDictionary(query.target_dictionary);
system_context.getExternalDictionaries().reloadDictionary(query.target_dictionary);
break;
case Type::RELOAD_DICTIONARIES:
{
auto status = getOverallExecutionStatusOfCommands(
[&] { context.getExternalDictionaries().reload(); },
[&] { context.getEmbeddedDictionaries().reload(); }
[&] { system_context.getExternalDictionaries().reload(); },
[&] { system_context.getEmbeddedDictionaries().reload(); }
);
if (status.code != 0)
throw Exception(status.message, status.code);
break;
}
case Type::RELOAD_EMBEDDED_DICTIONARIES:
system_context.getEmbeddedDictionaries().reload();
break;
case Type::RELOAD_CONFIG:
context.reloadConfig();
system_context.reloadConfig();
break;
case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES:

View File

@ -23,6 +23,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/executeQuery.h>
#include "DNSCacheUpdater.h"
namespace ProfileEvents
@ -377,6 +378,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!internal)
onExceptionBeforeStart(query, context, current_time);
DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded();
throw;
}

View File

@ -39,6 +39,8 @@ const char * ASTSystemQuery::typeToString(Type type)
return "RELOAD DICTIONARY";
case Type::RELOAD_DICTIONARIES:
return "RELOAD DICTIONARIES";
case Type::RELOAD_EMBEDDED_DICTIONARIES:
return "RELOAD EMBEDDED DICTIONARIES";
case Type::RELOAD_CONFIG:
return "RELOAD CONFIG";
case Type::STOP_MERGES:

View File

@ -24,6 +24,7 @@ public:
SYNC_REPLICA,
RELOAD_DICTIONARY,
RELOAD_DICTIONARIES,
RELOAD_EMBEDDED_DICTIONARIES,
RELOAD_CONFIG,
STOP_MERGES,
START_MERGES,

View File

@ -29,6 +29,7 @@
#include <Common/typeid_cast.h>
#include <Common/ClickHouseRevision.h>
#include <Common/formatReadable.h>
#include <Common/DNSResolver.h>
#include <Common/escapeForFileName.h>
#include <Client/Connection.h>
#include <Interpreters/Context.h>
@ -64,6 +65,7 @@
#include <Storages/StorageDistributed.h>
#include <Databases/DatabaseMemory.h>
#include <Server/StatusFile.h>
#include <daemon/OwnPatternFormatter.h>
namespace DB
@ -610,7 +612,7 @@ static ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, co
res.is_remote = 1;
for (auto & replica : replicas)
{
if (isLocalAddress(replica.resolved_address))
if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name)))
{
res.is_remote = 0;
break;

View File

@ -0,0 +1 @@
Compiler-7.0.0

View File

@ -1,6 +1,7 @@
#pragma once
#include <signal.h>
#include <errno.h>
#include <Common/Exception.h>

View File

@ -2,6 +2,7 @@
#include <memory>
#include <sys/resource.h>
#include <errno.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
@ -10,6 +11,7 @@
#include <common/ErrorHandlers.h>
#include <common/getMemoryAmount.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DNSResolver.h>
#include <Common/CurrentMetrics.h>
#include <Common/Macros.h>
#include <Common/StringUtils/StringUtils.h>
@ -39,6 +41,9 @@
#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h>
#include <Interpreters/DNSCacheUpdater.h>
#endif
namespace CurrentMetrics
@ -321,6 +326,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl"));
}
std::unique_ptr<DNSCacheUpdater> dns_cache_updater;
if (config().has("disable_internal_dns_cache") && config().getInt("disable_internal_dns_cache"))
{
/// Disable DNS caching at all
DNSResolver::instance().setDisableCacheFlag();
}
else
{
/// Initialize a watcher updating DNS cache in case of network errors
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
}
{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

View File

@ -4,6 +4,7 @@
#include <sys/stat.h>
#include <sys/file.h>
#include <fcntl.h>
#include <errno.h>
#include <Poco/File.h>
#include <common/logger_useful.h>

View File

@ -362,4 +362,7 @@
The directory will be created if it doesn't exist.
-->
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<!-- Uncomment to disable ClickHouse internal DNS caching. -->
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
</yandex>

View File

@ -6,6 +6,7 @@
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <pcg_random.hpp>
#include <random>
@ -25,7 +26,7 @@ constexpr double BackgroundProcessingPool::sleep_seconds;
constexpr double BackgroundProcessingPool::sleep_seconds_random_part;
void BackgroundProcessingPool::TaskInfo::wake()
void BackgroundProcessingPoolTaskInfo::wake()
{
if (removed)
return;
@ -36,7 +37,7 @@ void BackgroundProcessingPool::TaskInfo::wake()
std::unique_lock<std::mutex> lock(pool.tasks_mutex);
auto next_time_to_execute = iterator->first;
TaskHandle this_task_handle = iterator->second;
auto this_task_handle = iterator->second;
/// If this task was done nothing at previous time and it has to sleep, then cancel sleep time.
if (next_time_to_execute > current_time)
@ -180,6 +181,7 @@ void BackgroundProcessingPool::threadFunction()
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded();
}
if (shutdown)

View File

@ -16,6 +16,9 @@
namespace DB
{
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
* In this case, one task can run simultaneously from different threads.
* Designed for tasks that perform continuous background work (for example, merge).
@ -27,29 +30,7 @@ class BackgroundProcessingPool
public:
/// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
using Task = std::function<bool()>;
class TaskInfo
{
public:
/// Wake up any thread.
void wake();
TaskInfo(BackgroundProcessingPool & pool_, const Task & function_) : pool(pool_), function(function_) {}
private:
friend class BackgroundProcessingPool;
BackgroundProcessingPool & pool;
Task function;
/// Read lock is hold when task is executed.
std::shared_mutex rwlock;
std::atomic<bool> removed {false};
std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
};
using TaskInfo = BackgroundProcessingPoolTaskInfo;
using TaskHandle = std::shared_ptr<TaskInfo>;
@ -65,7 +46,9 @@ public:
~BackgroundProcessingPool();
private:
protected:
friend class BackgroundProcessingPoolTaskInfo;
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
using Threads = std::vector<std::thread>;
@ -87,4 +70,27 @@ private:
using BackgroundProcessingPoolPtr = std::shared_ptr<BackgroundProcessingPool>;
class BackgroundProcessingPoolTaskInfo
{
public:
/// Wake up any thread.
void wake();
BackgroundProcessingPoolTaskInfo(BackgroundProcessingPool & pool_, const BackgroundProcessingPool::Task & function_)
: pool(pool_), function(function_) {}
protected:
friend class BackgroundProcessingPool;
BackgroundProcessingPool & pool;
BackgroundProcessingPool::Task function;
/// Read lock is hold when task is executed.
std::shared_mutex rwlock;
std::atomic<bool> removed {false};
std::multimap<Poco::Timestamp, std::shared_ptr<BackgroundProcessingPoolTaskInfo>>::iterator iterator;
};
}

View File

@ -147,8 +147,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
}
size_t recommended_rows = estimateNumRows(*task, task->range_reader);
size_t rows_to_read = std::max(static_cast<decltype(max_block_size_rows)>(1),
std::min(max_block_size_rows, recommended_rows));
size_t rows_to_read = std::max(1, std::min(max_block_size_rows, recommended_rows));
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);

View File

@ -69,8 +69,8 @@ public:
readIntBinary(mrk_mark.offset_in_decompressed_block, mrk_hashing_buf);
bool has_alternative_mark = false;
MarkInCompressedFile alternative_data_mark;
MarkInCompressedFile data_mark;
MarkInCompressedFile alternative_data_mark = {};
MarkInCompressedFile data_mark = {};
/// If the mark should be exactly at the border of blocks, we can also use a mark pointing to the end of previous block,
/// and the beginning of next.

View File

@ -1,5 +1,6 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <map>
#include <optional>

View File

@ -1,5 +1,6 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <map>

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
namespace DB
@ -51,7 +52,7 @@ BlockInputStreams StorageSystemClusters::read(
res_columns[i++]->insert(static_cast<UInt64>(shard_info.weight));
res_columns[i++]->insert(static_cast<UInt64>(address.replica_num));
res_columns[i++]->insert(address.host_name);
res_columns[i++]->insert(address.resolved_address.host().toString());
res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString());
res_columns[i++]->insert(static_cast<UInt64>(address.port));
res_columns[i++]->insert(static_cast<UInt64>(shard_info.isLocal()));
res_columns[i++]->insert(address.user);

View File

@ -58,6 +58,7 @@ def test_DROP_DNS_CACHE(started_cluster):
instance = cluster.instances['ch1']
instance.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host > /etc/hosts'], privileged=True, user='root')
instance.query("SYSTEM DROP DNS CACHE")
with pytest.raises(QueryRuntimeException):
instance.query("SELECT * FROM remote('lost_host', 'system', 'one')")

View File

@ -1,5 +1,6 @@
DROP TABLE IF EXISTS test.rename1;
DROP TABLE IF EXISTS test.rename2;
DROP TABLE IF EXISTS test.rename3;
CREATE TABLE test.rename1 (p Int64, i Int64, v UInt64) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/test/tables/rename', '1', v) PARTITION BY p ORDER BY i;
CREATE TABLE test.rename2 (p Int64, i Int64, v UInt64) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/test/tables/rename', '2', v) PARTITION BY p ORDER BY i;

View File

@ -71,6 +71,7 @@
</shard>
</test_shard_localhost_secure>
</remote_servers>
<include_from/>
<zookeeper incl="zookeeper-servers" optional="true" />
<macros incl="macros" optional="true" />
<builtin_dictionaries_reload_interval>3600</builtin_dictionaries_reload_interval>

5
debian/.pbuilderrc vendored
View File

@ -173,9 +173,12 @@ else
export CMAKE_FLAGS="-DENABLE_EMBEDDED_COMPILER=0 $CMAKE_FLAGS"
fi
# will test symbols
# Will test symbols
#EXTRAPACKAGES+=" gdb "
# For killall in pbuilder-hooks:
EXTRAPACKAGES+=" psmisc "
[[ $CCACHE_PREFIX == 'distcc' ]] && EXTRAPACKAGES+=" $CCACHE_PREFIX "
export DEB_BUILD_OPTIONS=parallel=`nproc`

3
debian/control vendored
View File

@ -5,7 +5,8 @@ Maintainer: Alexey Milovidov <milovidov@yandex-team.ru>
Build-Depends: debhelper (>= 9),
cmake3 | cmake,
ninja-build,
gcc-7, g++-7,
gcc-7 [amd64 i386], g++-7 [amd64 i386],
clang-6.0 [arm64 armhf] | clang-5.0 [arm64 armhf],
libc6-dev,
libmariadbclient-dev | default-libmysqlclient-dev | libmysqlclient-dev,
libicu-dev,

View File

@ -2,6 +2,6 @@
mkdir -p /server/build_docker
cd /server/build_docker
cmake /server -DENABLE_EMBEDDED_COMPILER=1 -DENABLE_TESTS=0
cmake /server -DENABLE_TESTS=0
make -j $(nproc || grep -c ^processor /proc/cpuinfo)
#ctest -V -j $(nproc || grep -c ^processor /proc/cpuinfo)

View File

@ -14,7 +14,7 @@ namespace ext
std::decay_t<To> bit_cast(const From & from)
{
To res {};
memcpy(&res, &from, std::min(sizeof(res), sizeof(from)));
memcpy(static_cast<void*>(&res), &from, std::min(sizeof(res), sizeof(from)));
return res;
};

View File

@ -1,7 +1,2 @@
add_executable (mysqlxx_test mysqlxx_test.cpp)
add_executable (failover failover.cpp)
target_link_libraries (mysqlxx_test mysqlxx)
target_link_libraries (failover mysqlxx ${Poco_Util_LIBRARY} ${Poco_Foundation_LIBRARY})
target_link_rt_by_force (failover)

View File

@ -1,36 +0,0 @@
#include <mysqlxx/PoolWithFailover.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
#include <iostream>
class App : public Poco::Util::Application
{
public:
App() {}
};
int main()
{
App app;
app.loadConfiguration("failover.xml");
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
mysqlxx::PoolWithFailover pool("mysql_goals");
for (size_t i = 0; i < 10; ++i)
{
mysqlxx::PoolWithFailover::Entry conn = pool.Get();
mysqlxx::Query Q = conn->query();
Q << "SELECT count(*) FROM counters";
mysqlxx::UseQueryResult R = Q.use();
std::cout << R.fetch_row()[0] << std::endl;
}
return 0;
}

14
release
View File

@ -6,7 +6,7 @@
# Clang6 build:
# env DIST=bionic EXTRAPACKAGES="clang-6.0 libstdc++-8-dev lld-6.0 liblld-6.0-dev libclang-6.0-dev liblld-6.0" DEB_CC=clang-6.0 DEB_CXX=clang++-6.0 CMAKE_FLAGS=" -DLLVM_VERSION_POSTFIX=-6.0 -DNO_WERROR=1 " ./release
# Clang7 build:
# env DIST=unstable EXTRAPACKAGES="clang-7 libstdc++-8-dev lld-7 liblld-7-dev libclang-7-dev llvm-7-dev liblld-7" DEB_CC=clang-7 DEB_CXX=clang++-7 CMAKE_FLAGS=" -DLLVM_VERSION_POSTFIX=-7 -DNO_WERROR=1 " ./release
# env DIST=unstable EXTRAPACKAGES="clang-7 libstdc++-8-dev lld-7 liblld-7-dev libclang-7-dev liblld-7" DEB_CC=clang-7 DEB_CXX=clang++-7 CMAKE_FLAGS=" -DLLVM_VERSION_POSTFIX=-7 -DNO_WERROR=1 " ./release
# Clang6 without internal compiler (for low memory arm64):
# env DIST=bionic DISABLE_PARALLEL=1 EXTRAPACKAGES="clang-6.0 libstdc++-8-dev" DEB_CC=clang-6.0 DEB_CXX=clang++-6.0 CMAKE_FLAGS=" -DNO_WERROR=1 " ./release
@ -33,7 +33,7 @@ while [[ $1 == --* ]]
do
if [[ $1 == '--test' ]]; then
TEST='yes'
VERSION_POSTFIX+=-test
VERSION_POSTFIX+=+test
shift
elif [[ $1 == '--check-build-dependencies' ]]; then
DEBUILD_NODEPS_OPTIONS=""
@ -66,7 +66,7 @@ done
if [ -n "$SANITIZER" ]
then
CMAKE_BUILD_TYPE=$SANITIZER
VERSION_POSTFIX+=-${SANITIZER,,}
VERSION_POSTFIX+=+${SANITIZER,,}
# todo: нужно ли отключить libtcmalloc?
LIBTCMALLOC_OPTS="-DENABLE_TCMALLOC=0"
# GLIBC_COMPATIBILITY отключен по умолчанию
@ -75,14 +75,14 @@ then
EXTRAPACKAGES="$EXTRAPACKAGES clang-5.0 lld-5.0"
elif [[ $BUILD_TYPE == 'valgrind' ]]; then
LIBTCMALLOC_OPTS="-DENABLE_TCMALLOC=0"
VERSION_POSTFIX+=-$BUILD_TYPE
VERSION_POSTFIX+=+$BUILD_TYPE
elif [[ $BUILD_TYPE == 'debug' ]]; then
CMAKE_BUILD_TYPE=Debug
LIBTCMALLOC_OPTS="-DDEBUG_TCMALLOC=1"
VERSION_POSTFIX+=-$BUILD_TYPE
VERSION_POSTFIX+=+$BUILD_TYPE
fi
CMAKE_FLAGS=" $LIBTCMALLOC_OPTS -DCMAKE_BUILD_TYPE=$CMAKE_BUILD_TYPE -DENABLE_EMBEDDED_COMPILER=1 $CMAKE_FLAGS"
CMAKE_FLAGS=" $LIBTCMALLOC_OPTS -DCMAKE_BUILD_TYPE=$CMAKE_BUILD_TYPE $CMAKE_FLAGS"
export CMAKE_FLAGS
export EXTRAPACKAGES
@ -97,7 +97,7 @@ if [ -z "$USE_PBUILDER" ] ; then
-e DEB_CC=$DEB_CC -e DEB_CXX=$DEB_CXX -e CMAKE_FLAGS="$CMAKE_FLAGS" \
-b ${DEBUILD_NOSIGN_OPTIONS} ${DEBUILD_NODEPS_OPTIONS}
else
export DIST=${DIST:=artful}
export DIST=${DIST:=bionic}
export SET_BUILDRESULT=${SET_BUILDRESULT:=$CURDIR/..}
. $CURDIR/debian/.pbuilderrc

View File

@ -24,7 +24,7 @@ env TEST_RUN=${TEST_RUN=1} \
DEB_CC=${DEB_CC=$CC} DEB_CXX=${DEB_CXX=$CXX} \
CCACHE_SIZE=${CCACHE_SIZE:=4G} \
`# Disable all features` \
CMAKE_FLAGS="-DCMAKE_BUILD_TYPE=Debug -DUNBUNDLED=1 -DENABLE_UNWIND=0 -DENABLE_MYSQL=0 -DENABLE_CAPNP=0 -DENABLE_RDKAFKA=0 -DENABLE_EMBEDDED_COMPILER=1 -DCMAKE_C_FLAGS_ADD='-O0 -g0' -DCMAKE_CXX_FLAGS_ADD='-O0 -g0' $CMAKE_FLAGS" \
CMAKE_FLAGS="-DCMAKE_BUILD_TYPE=Debug -DUNBUNDLED=1 -DENABLE_UNWIND=0 -DENABLE_MYSQL=0 -DENABLE_CAPNP=0 -DENABLE_RDKAFKA=0 -DCMAKE_C_FLAGS_ADD='-O0 -g0' -DCMAKE_CXX_FLAGS_ADD='-O0 -g0' $CMAKE_FLAGS" \
`# Use all possible contrib libs from system` \
`# psmisc - killall` \
EXTRAPACKAGES="psmisc clang-5.0 lld-5.0 liblld-5.0-dev libclang-5.0-dev liblld-5.0 libc++abi-dev libc++-dev libboost-program-options-dev libboost-system-dev libboost-filesystem-dev libboost-thread-dev zlib1g-dev liblz4-dev libdouble-conversion-dev libsparsehash-dev librdkafka-dev libpoco-dev libsparsehash-dev libgoogle-perftools-dev libzstd-dev libre2-dev $EXTRAPACKAGES" \