Merge remote-tracking branch 'upstream/master' into fix25

This commit is contained in:
proller 2019-09-12 17:08:22 +03:00
commit e9756dbd8f
33 changed files with 339 additions and 148 deletions

View File

@ -150,6 +150,7 @@ endif ()
if (LINKER_NAME)
message(STATUS "Using linker: ${LINKER_NAME} (selected from: LLD_PATH=${LLD_PATH}; GOLD_PATH=${GOLD_PATH}; COMPILER_POSTFIX=${COMPILER_POSTFIX})")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=${LINKER_NAME}")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fuse-ld=${LINKER_NAME}")
endif ()
# Make sure the final executable has symbols exported
@ -230,6 +231,7 @@ endif ()
# Make this extra-checks for correct library dependencies.
if (NOT SANITIZE)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-undefined")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-undefined")
endif ()
include(cmake/dbms_glob_sources.cmake)

View File

@ -12,8 +12,7 @@
# https://youtrack.jetbrains.com/issue/CPP-2659
# https://youtrack.jetbrains.com/issue/CPP-870
string(TOLOWER "${CMAKE_COMMAND}" CMAKE_COMMAND_LOWER)
if (NOT ${CMAKE_COMMAND_LOWER} MATCHES "clion")
if (NOT DEFINED ENV{CLION_IDE})
find_program(NINJA_PATH ninja)
if (NINJA_PATH)
set(CMAKE_GENERATOR "Ninja" CACHE INTERNAL "" FORCE)

View File

@ -18,3 +18,4 @@ ClickHouse is an open-source column-oriented database management system that all
* [ClickHouse Meetup in Hong Kong](https://www.meetup.com/Hong-Kong-Machine-Learning-Meetup/events/263580542/) on October 17.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.
* [ClickHouse Meetup in Tokyo](https://clickhouse.connpass.com/event/147001/) on November 14.

View File

@ -1,20 +1,26 @@
option (USE_CAPNP "Enable Cap'n Proto" ON)
option (ENABLE_CAPNP "Enable Cap'n Proto" ON)
if (USE_CAPNP)
option (USE_INTERNAL_CAPNP_LIBRARY "Set to FALSE to use system capnproto library instead of bundled" ${NOT_UNBUNDLED})
if (ENABLE_CAPNP)
# FIXME: refactor to use `add_library( IMPORTED)` if possible.
if (NOT USE_INTERNAL_CAPNP_LIBRARY)
option (USE_INTERNAL_CAPNP_LIBRARY "Set to FALSE to use system capnproto library instead of bundled" ${NOT_UNBUNDLED})
# FIXME: refactor to use `add_library( IMPORTED)` if possible.
if (NOT USE_INTERNAL_CAPNP_LIBRARY)
find_library (KJ kj)
find_library (CAPNP capnp)
find_library (CAPNPC capnpc)
set (CAPNP_LIBRARIES ${CAPNPC} ${CAPNP} ${KJ})
else ()
else ()
add_subdirectory(contrib/capnproto-cmake)
set (CAPNP_LIBRARIES capnpc)
endif ()
message (STATUS "Using capnp: ${CAPNP_LIBRARIES}")
endif ()
if (CAPNP_LIBRARIES)
set (USE_CAPNP 1)
endif ()
endif ()
message (STATUS "Using capnp: ${CAPNP_LIBRARIES}")

View File

@ -143,15 +143,15 @@ add_subdirectory(src/Common/Config)
set (all_modules)
macro(add_object_library name common_path)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_glob(dbms_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${common_path}/*.h)
add_glob(dbms_sources ${common_path}/*.cpp ${common_path}/*.c ${common_path}/*.h)
else ()
list (APPEND all_modules ${name})
add_glob(${name}_headers RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${common_path}/*.h)
add_glob(${name}_sources ${common_path}/*.cpp ${common_path}/*.c ${common_path}/*.h)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_library(${name} OBJECT ${${name}_sources} ${${name}_headers})
else ()
add_library(${name} SHARED ${${name}_sources} ${${name}_headers})
# force all split libs to be linked
target_link_options(${name} PUBLIC "-Wl,--no-as-needed")
target_link_libraries (${name} PRIVATE -Wl,--unresolved-symbols=ignore-all)
endif ()
endmacro()
@ -177,15 +177,15 @@ add_object_library(clickhouse_processors_transforms src/Processors/Transforms)
add_object_library(clickhouse_processors_sources src/Processors/Sources)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
foreach (module ${all_modules})
list (APPEND dbms_sources $<TARGET_OBJECTS:${module}>)
endforeach ()
add_library(dbms STATIC ${dbms_headers} ${dbms_sources})
add_library (dbms STATIC ${dbms_headers} ${dbms_sources})
set (all_modules dbms)
else()
add_library(dbms SHARED ${dbms_headers} ${dbms_sources})
add_library (dbms SHARED ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PUBLIC ${all_modules})
list (APPEND all_modules dbms)
# force all split libs to be linked
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--no-as-needed")
endif ()
list (APPEND all_modules dbms)
macro (dbms_target_include_directories)
foreach (module ${all_modules})
@ -389,7 +389,8 @@ if (USE_PARQUET)
endif ()
if (OPENSSL_CRYPTO_LIBRARY)
dbms_target_link_libraries(PRIVATE ${OPENSSL_CRYPTO_LIBRARY})
dbms_target_link_libraries (PRIVATE ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries (clickhouse_common_io PRIVATE ${OPENSSL_CRYPTO_LIBRARY})
endif ()
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})

View File

@ -566,6 +566,14 @@ private:
<< " server version " << server_version
<< " revision " << server_revision
<< "." << std::endl << std::endl;
if (std::make_tuple(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
< std::make_tuple(server_version_major, server_version_minor, server_version_patch))
{
std::cout << "ClickHouse client version is older than ClickHouse server. "
<< "It may lack support for new features."
<< std::endl << std::endl;
}
}
}

View File

@ -1,5 +1,5 @@
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_common_zookeeper clickhouse_parsers clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries string_utils PUBLIC daemon)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_common_zookeeper clickhouse_parsers clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries string_utils ${Poco_XML_LIBRARY} PUBLIC daemon)
set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
clickhouse_program_add(copier)

View File

@ -1,3 +1,5 @@
#include <Common/config.h>
#if USE_POCO_NETSSL
#include "MySQLHandler.h"
#include <limits>
@ -301,3 +303,4 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
}
}
#endif

View File

@ -1,4 +1,6 @@
#pragma once
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Poco/Net/TCPServerConnection.h>
#include <Poco/Net/SecureStreamSocket.h>
@ -56,3 +58,4 @@ private:
};
}
#endif

View File

@ -1,3 +1,5 @@
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Common/OpenSSLHelpers.h>
#include <Poco/Crypto/X509Certificate.h>
#include <Poco/Net/SSLManager.h>
@ -122,3 +124,4 @@ Poco::Net::TCPServerConnection * MySQLHandlerFactory::createConnection(const Poc
}
}
#endif

View File

@ -1,5 +1,8 @@
#pragma once
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <atomic>
#include <openssl/rsa.h>
@ -37,3 +40,4 @@ public:
};
}
#endif

View File

@ -88,10 +88,11 @@ public:
if (is_large)
{
toLarge();
UInt32 cardinality;
readBinary(cardinality, in);
db_roaring_bitmap_add_many(in, rb, cardinality);
std::string s;
readStringBinary(s,in);
rb = roaring_bitmap_portable_deserialize(s.c_str());
for (const auto & x : small) //merge from small
roaring_bitmap_add(rb, x.getValue());
}
else
small.read(in);
@ -103,9 +104,10 @@ public:
if (isLarge())
{
UInt32 cardinality = roaring_bitmap_get_cardinality(rb);
writePODBinary(cardinality, out);
db_ra_to_uint32_array(out, &rb->high_low_container);
uint32_t expectedsize = roaring_bitmap_portable_size_in_bytes(rb);
std::string s(expectedsize,0);
roaring_bitmap_portable_serialize(rb, const_cast<char*>(s.data()));
writeStringBinary(s,out);
}
else
small.write(out);

View File

@ -1,3 +1,5 @@
#include <Common/config.h>
#if USE_POCO_NETSSL
#include "OpenSSLHelpers.h"
#include <ext/scope_guard.h>
#include <openssl/err.h>
@ -16,3 +18,4 @@ String getOpenSSLErrors()
}
}
#endif

View File

@ -1,4 +1,6 @@
#pragma once
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Core/Types.h>
@ -10,3 +12,4 @@ namespace DB
String getOpenSSLErrors();
}
#endif

View File

@ -4,8 +4,6 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <cassert>
namespace ProfileEvents
{
@ -35,22 +33,48 @@ namespace ErrorCodes
}
/** A single-use object that represents lock's ownership
* For the purpose of exception safety guarantees LockHolder is to be used in two steps:
* 1. Create an instance (allocating all the memory needed)
* 2. Associate the instance with the lock (attach to the lock and locking request group)
*/
class RWLockImpl::LockHolderImpl
{
bool bound{false};
Type lock_type;
String query_id;
CurrentMetrics::Increment active_client_increment;
RWLock parent;
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
QueryIdToHolder::key_type query_id;
CurrentMetrics::Increment active_client_increment;
LockHolderImpl(RWLock && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
public:
LockHolderImpl(const LockHolderImpl & other) = delete;
LockHolderImpl& operator=(const LockHolderImpl & other) = delete;
/// Implicit memory allocation for query_id is done here
LockHolderImpl(const String & query_id_, Type type)
: lock_type{type}, query_id{query_id_},
active_client_increment{
type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters}
{
}
~LockHolderImpl();
private:
/// A separate method which binds the lock holder to the owned lock
/// N.B. It is very important that this method produces no allocations
bool bind_with(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept
{
if (bound)
return false;
it_group = it_group_;
parent = std::move(parent_);
++it_group->refererrs;
bound = true;
return true;
}
friend class RWLockImpl;
};
@ -62,29 +86,33 @@ namespace
class QueryLockInfo
{
private:
std::mutex mutex;
mutable std::mutex mutex;
std::map<std::string, size_t> queries;
public:
void add(const String & query_id)
{
std::lock_guard lock(mutex);
++queries[query_id];
const auto res = queries.emplace(query_id, 1); // may throw
if (!res.second)
++res.first->second;
}
void remove(const String & query_id)
void remove(const String & query_id) noexcept
{
std::lock_guard lock(mutex);
auto it = queries.find(query_id);
assert(it != queries.end());
if (--it->second == 0)
queries.erase(it);
const auto query_it = queries.find(query_id);
if (query_it != queries.cend() && --query_it->second == 0)
queries.erase(query_it);
}
void check(const String & query_id)
void check(const String & query_id) const
{
std::lock_guard lock(mutex);
if (queries.count(query_id))
if (queries.find(query_id) != queries.cend())
throw Exception("Possible deadlock avoided. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
}
};
@ -93,8 +121,16 @@ namespace
}
/** To guarantee that we do not get any piece of our data corrupted:
* 1. Perform all actions that include allocations before changing lock's internal state
* 2. Roll back any changes that make the state inconsistent
*
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
{
const bool request_has_query_id = query_id != NO_QUERY;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
: CurrentMetrics::RWLockWaitingWriters);
@ -106,29 +142,39 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
: ProfileEvents::RWLockWritersWaitMilliseconds, watch.elapsedMilliseconds());
};
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
/// This object is placed above unique_lock, because it may lock in destructor.
LockHolder res;
auto lock_holder = std::make_shared<LockHolderImpl>(query_id, type);
std::unique_lock lock(mutex);
/// Check if the same query is acquiring previously acquired lock
if (query_id != RWLockImpl::NO_QUERY)
/// The FastPath:
/// Check if the same query_id already holds the required lock in which case we can proceed without waiting
if (request_has_query_id)
{
auto it_query = query_id_to_holder.find(query_id);
if (it_query != query_id_to_holder.end())
res = it_query->second.lock();
}
const auto it_query = owner_queries.find(query_id);
if (it_query != owner_queries.end())
{
const auto current_owner_group = queue.begin();
if (res)
{
/// XXX: it means we can't upgrade lock from read to write - with proper waiting!
if (type != Read || res->it_group->type != Read)
throw Exception("Attempt to acquire exclusive lock recursively", ErrorCodes::LOGICAL_ERROR);
else
return res;
/// XXX: it means we can't upgrade lock from read to write!
if (type == Write)
throw Exception(
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
ErrorCodes::LOGICAL_ERROR);
if (current_owner_group->type == Write)
throw Exception(
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
ErrorCodes::LOGICAL_ERROR);
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
all_read_locks.add(query_id); /// SM1: may throw on insertion (nothing to roll back)
++it_query->second; /// SM2: nothrow
lock_holder->bind_with(shared_from_this(), current_owner_group); /// SM3: nothrow
finalize_metrics();
return lock_holder;
}
}
/** If the query already has any active read lock and tries to acquire another read lock
@ -148,86 +194,106 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
{
if (type == Type::Read && !queue.empty() && queue.back().type == Type::Write && query_id != RWLockImpl::NO_QUERY)
if (type == Type::Read && request_has_query_id && !queue.empty())
all_read_locks.check(query_id);
/// Create new group of clients
it_group = queue.emplace(queue.end(), type);
/// Create a new group of locking requests
queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
else
{
/// Will append myself to last group
it_group = std::prev(queue.end());
if (it_group != queue.begin() && query_id != RWLockImpl::NO_QUERY)
else if (request_has_query_id && queue.size() > 1)
all_read_locks.check(query_id);
}
/// Append myself to the end of chosen group
auto & clients = it_group->clients;
try
{
it_client = clients.emplace(clients.end(), type);
}
catch (...)
{
/// Remove group if it was the first client in the group and an error occurred
if (clients.empty())
queue.erase(it_group);
throw;
}
GroupsContainer::iterator it_group = std::prev(queue.end());
res.reset(new LockHolderImpl(shared_from_this(), it_group, it_client));
/// We need to reference the associated group before waiting to guarantee
/// that this group does not get deleted prematurely
++it_group->refererrs;
/// Wait a notification until we will be the only in the group.
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
/// Insert myself (weak_ptr to the holder) to queries set to implement recursive lock
if (query_id != RWLockImpl::NO_QUERY)
{
query_id_to_holder.emplace(query_id, res);
--it_group->refererrs;
if (request_has_query_id)
{
try
{
if (type == Type::Read)
all_read_locks.add(query_id);
all_read_locks.add(query_id); /// SM2: may throw on insertion
/// and is safe to roll back unconditionally
const auto emplace_res =
owner_queries.emplace(query_id, 1); /// SM3: may throw on insertion
if (!emplace_res.second)
++emplace_res.first->second; /// SM4: nothrow
}
res->query_id = query_id;
catch (...)
{
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
/// We only need to roll back the changes to these objects: all_read_locks and the locking queue
if (type == Type::Read)
all_read_locks.remove(query_id); /// Rollback(SM2): nothrow
if (it_group->refererrs == 0)
{
const auto next = queue.erase(it_group); /// Rollback(SM1): nothrow
if (next != queue.end())
next->cv.notify_all();
}
throw;
}
}
lock_holder->bind_with(shared_from_this(), it_group); /// SM: nothrow
finalize_metrics();
return res;
return lock_holder;
}
/** The sequence points of acquiring lock's ownership by an instance of LockHolderImpl:
* 1. all_read_locks is updated
* 2. owner_queries is updated
* 3. request group is updated by LockHolderImpl which in turn becomes "bound"
*
* If by the time when destructor of LockHolderImpl is called the instance has been "bound",
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
*
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
*/
RWLockImpl::LockHolderImpl::~LockHolderImpl()
{
if (!bound || parent == nullptr)
return;
std::lock_guard lock(parent->mutex);
/// Remove weak_ptrs to the holder, since there are no owners of the current lock
parent->query_id_to_holder.erase(query_id);
/// The associated group must exist (and be the beginning of the queue?)
if (parent->queue.empty() || it_group != parent->queue.begin())
return;
if (*it_client == RWLockImpl::Read && query_id != RWLockImpl::NO_QUERY)
all_read_locks.remove(query_id);
/// Removes myself from client list of our group
it_group->clients.erase(it_client);
/// Remove the group if we were the last client and notify the next group
if (it_group->clients.empty())
/// If query_id is not empty it must be listed in parent->owner_queries
if (query_id != RWLockImpl::NO_QUERY)
{
auto & parent_queue = parent->queue;
parent_queue.erase(it_group);
const auto owner_it = parent->owner_queries.find(query_id);
if (owner_it != parent->owner_queries.end())
{
if (--owner_it->second == 0) /// SM: nothrow
parent->owner_queries.erase(owner_it); /// SM: nothrow
if (!parent_queue.empty())
parent_queue.front().cv.notify_all();
if (lock_type == RWLockImpl::Read)
all_read_locks.remove(query_id); /// SM: nothrow
}
}
/// If we are the last remaining referrer, remove the group and notify the next group
if (--it_group->refererrs == 0) /// SM: nothrow
{
const auto next = parent->queue.erase(it_group); /// SM: nothrow
if (next != parent->queue.end())
next->cv.notify_all();
}
}
RWLockImpl::LockHolderImpl::LockHolderImpl(RWLock && parent_, RWLockImpl::GroupsContainer::iterator it_group_,
RWLockImpl::ClientsContainer::iterator it_client_)
: parent{std::move(parent_)}, it_group{it_group_}, it_client{it_client_},
active_client_increment{(*it_client == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
: CurrentMetrics::RWLockActiveWriters}
{
}
}

View File

@ -8,6 +8,7 @@
#include <condition_variable>
#include <map>
#include <string>
#include <unordered_map>
namespace DB
@ -53,25 +54,24 @@ private:
struct Group;
using GroupsContainer = std::list<Group>;
using ClientsContainer = std::list<Type>;
using QueryIdToHolder = std::map<String, std::weak_ptr<LockHolderImpl>>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
/// Group of clients that should be executed concurrently
/// i.e. a group could contain several readers, but only one writer
/// Group of locking requests that should be granted concurrently
/// i.e. a group can contain several readers, but only one writer
struct Group
{
// FIXME: there is only redundant |type| information inside |clients|.
const Type type;
ClientsContainer clients;
size_t refererrs;
std::condition_variable cv; /// all clients of the group wait group condvar
std::condition_variable cv; /// all locking requests of the group wait on this condvar
explicit Group(Type type_) : type{type_} {}
explicit Group(Type type_) : type{type_}, refererrs{0} {}
};
mutable std::mutex mutex;
GroupsContainer queue;
QueryIdToHolder query_id_to_holder;
OwnerQueryIds owner_queries;
mutable std::mutex mutex;
};

View File

@ -11,6 +11,7 @@
#endif
#include <gtest/gtest.h>
#include <chrono>
namespace DB

View File

@ -1,3 +1,5 @@
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <IO/WriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
@ -100,3 +102,4 @@ size_t getLengthEncodedStringSize(const String & s)
}
}
#endif

View File

@ -1,4 +1,6 @@
#pragma once
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <ext/scope_guard.h>
#include <openssl/pem.h>
@ -1075,3 +1077,4 @@ private:
}
}
#endif

View File

@ -28,8 +28,6 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
break;
to.write(block);
if (!block.rows())
to.flush();
progress(block);
}

View File

@ -1,3 +1,4 @@
#include <Common/config.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Core/Settings.h>
@ -312,7 +313,9 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorODBCDriver(*this);
registerOutputFormatProcessorODBCDriver2(*this);
registerOutputFormatProcessorNull(*this);
#if USE_POCO_NETSSL
registerOutputFormatProcessorMySQLWrite(*this);
#endif
}
FormatFactory & FormatFactory::instance()

View File

@ -15,7 +15,6 @@ if(USE_HYPERSCAN)
target_include_directories(clickhouse_functions_url SYSTEM PRIVATE ${HYPERSCAN_INCLUDE_DIR})
endif()
include(${ClickHouse_SOURCE_DIR}/cmake/find_gperf.cmake)
if (USE_GPERF)
# Only for regenerate
add_custom_target(generate-tldlookup-gperf ./tldLookup.sh

View File

@ -38,6 +38,7 @@
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Parsers/ASTWatchQuery.h>
namespace ProfileEvents
{
@ -658,6 +659,18 @@ void executeQuery(
if (set_query_id)
set_query_id(context.getClientInfo().current_query_id);
if (ast->as<ASTWatchQuery>())
{
/// For Watch query, flush data if block is empty (to send data to client).
auto flush_callback = [&out](const Block & block)
{
if (block.rows() == 0)
out->flush();
};
copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback));
}
else
copyData(*streams.in, *out);
}

View File

@ -1,3 +1,5 @@
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Core/MySQLProtocol.h>
@ -116,3 +118,4 @@ void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory)
}
}
#endif

View File

@ -1,4 +1,6 @@
#pragma once
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Processors/Formats/IRowOutputFormat.h>
#include <Core/Block.h>
@ -40,3 +42,4 @@ private:
};
}
#endif

View File

@ -150,8 +150,6 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
return true;
}
put_delimiter = (delimiter != 0);
if (current == messages.end())
{
if (intermediate_commit)
@ -183,6 +181,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
// XXX: very fishy place with const casting.
auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
BufferBase::set(new_position, current->get_payload().get_size(), 0);
put_delimiter = (delimiter != 0);
/// Since we can poll more messages than we already processed - commit only processed messages.
consumer->store_offset(*current);

View File

@ -248,6 +248,21 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(180)
def test_kafka_select_empty(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'empty',
kafka_group_name = 'empty',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
assert int(instance.query('SELECT count() FROM test.kafka')) == 0
@pytest.mark.timeout(180)
def test_kafka_json_without_delimiter(kafka_cluster):
instance.query('''

View File

@ -0,0 +1,2 @@
0
10

View File

@ -0,0 +1,5 @@
drop table if exists tab;
create table tab (x UInt64) engine = MergeTree order by tuple();
insert into tab select number as n from numbers(20) any inner join (select number * 10 as n from numbers(2)) using(n) settings any_join_distinct_right_table_keys = 1, max_block_size = 5;
select * from tab order by x;

View File

@ -0,0 +1,24 @@
Set any_join_distinct_right_table_keys=1;
DROP TABLE IF EXISTS test_insert_t1;
DROP TABLE IF EXISTS test_insert_t2;
DROP TABLE IF EXISTS test_insert_t3;
CREATE TABLE test_insert_t1 (`dt` Date, `uid` String, `name` String, `city` String) ENGINE = MergeTree PARTITION BY toYYYYMMDD(dt) ORDER BY name SETTINGS index_granularity = 8192;
CREATE TABLE test_insert_t2 (`dt` Date, `uid` String) ENGINE = MergeTree PARTITION BY toYYYYMMDD(dt) ORDER BY uid SETTINGS index_granularity = 8192;
CREATE TABLE test_insert_t3 (`dt` Date, `uid` String, `name` String, `city` String) ENGINE = MergeTree PARTITION BY toYYYYMMDD(dt) ORDER BY name SETTINGS index_granularity = 8192;
INSERT INTO test_insert_t1 SELECT '2019-09-01',toString(number),toString(rand()),toString(rand()) FROM system.numbers WHERE number > 10 limit 1000000;
INSERT INTO test_insert_t2 SELECT '2019-09-01',toString(number) FROM system.numbers WHERE number >=0 limit 200;
INSERT INTO test_insert_t2 SELECT '2019-09-01',toString(number) FROM system.numbers WHERE number >=100000 limit 200;
INSERT INTO test_insert_t2 SELECT '2019-09-01',toString(number) FROM system.numbers WHERE number >=300000 limit 200;
INSERT INTO test_insert_t2 SELECT '2019-09-01',toString(number) FROM system.numbers WHERE number >=500000 limit 200;
INSERT INTO test_insert_t2 SELECT '2019-09-01',toString(number) FROM system.numbers WHERE number >=700000 limit 200;
INSERT INTO test_insert_t2 SELECT '2019-09-01',toString(number) FROM system.numbers WHERE number >=900000 limit 200;
INSERT INTO test_insert_t3 SELECT '2019-09-01', uid, name, city FROM ( SELECT dt, uid, name, city FROM test_insert_t1 WHERE dt = '2019-09-01') t1 GLOBAL ANY INNER JOIN (SELECT uid FROM test_insert_t2 WHERE dt = '2019-09-01') t2 ON t1.uid=t2.uid;
SELECT count(*) FROM test_insert_t3;
DROP TABLE test_insert_t1;
DROP TABLE test_insert_t2;
DROP TABLE test_insert_t3;

View File

@ -894,4 +894,19 @@ Error count of each replica is capped at this value, preventing a single replica
- [Table engine Distributed](../../operations/table_engines/distributed.md)
- [`distributed_replica_error_half_life`](#settings-distributed_replica_error_half_life)
## os_thread_priority {#setting-os_thread_priority}
Sets the priority ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) for threads that execute queries. OS scheduler considers this priority when choosing the next thread to run on each available CPU core.
!!! warning "Warning"
To use this setting, you need to set the `CAP_SYS_NICE` capability. The `clickhouse-server` package sets it up during installation. Some virtual environments don't allow to set the `CAP_SYS_NICE` capability. In this case `clickhouse-server` shows a message about it at the start.
Possible values:
You can set values in the `[-20, 19]` range.
The lower value means a higher priority. Threads with low values of `nice` priority are executed more frequently than threads with high values. High values are preferable for long running non-interactive queries because it allows them to quickly give up resources in favour of short interactive queries when they arrive.
Default value: 0.
[Original article](https://clickhouse.yandex/docs/en/operations/settings/settings/) <!-- hide -->

View File

@ -62,7 +62,7 @@ Columns:
Please note that `errors_count` is updated once per query to the cluster, but `estimated_recovery_time` is recalculated on-demand. So there could be a case of non-zero `errors_count` and zero `estimated_recovery_time`, that next query will zero `errors_count` and try to use replica as if it has no errors.
** See also **
**See also**
- [Table engine Distributed](table_engines/distributed.md)
- [distributed_replica_error_cap setting](settings/settings.md#settings-distributed_replica_error_cap)