Merge branch 'fix-debian-descriptions' into cleanup-build

This commit is contained in:
Yatsishin Ilya 2022-01-18 17:53:18 +00:00
commit 3e1d0fdd79
109 changed files with 1835 additions and 1091 deletions

View File

@ -2,7 +2,7 @@
#include <base/scope_guard.h>
#include <base/logger_useful.h>
#include <Common/MemoryTracker.h>
#include <Common/LockMemoryExceptionInThread.h>
/// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors.
///
@ -12,8 +12,7 @@
///
/// NOTE: it should be used with caution.
#define SCOPE_EXIT_MEMORY(...) SCOPE_EXIT( \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
)
@ -57,8 +56,7 @@
#define SCOPE_EXIT_MEMORY_SAFE(...) SCOPE_EXIT( \
try \
{ \
MemoryTracker::LockExceptionInThread \
lock_memory_tracker(VariableContext::Global); \
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global); \
__VA_ARGS__; \
} \
catch (...) \

View File

@ -51,6 +51,7 @@
#include <Common/getExecutablePath.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/Elf.h>
#include <Common/setThreadName.h>
#include <filesystem>
#include <loggers/OwnFormattingChannel.h>

View File

@ -10,6 +10,8 @@
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/IO.h>
@ -57,7 +59,7 @@ void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
/// but let's log it into the stderr at least.
catch (...)
{
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
const std::string & exception_message = getCurrentExceptionMessage(true);
const std::string & message = msg.getText();

6
debian/.pbuilderrc vendored
View File

@ -104,8 +104,7 @@ ALLOWUNTRUSTED=${SET_ALLOWUNTRUSTED:=${ALLOWUNTRUSTED}}
if $(echo ${DEBIAN_SUITES[@]} | grep -q $DIST); then
# Debian configuration
OSNAME=debian
#MIRRORSITE=${SET_MIRRORSITE="http://deb.debian.org/$OSNAME/"}
MIRRORSITE=${SET_MIRRORSITE="http://mirror.yandex.ru/$OSNAME/"}
MIRRORSITE=${SET_MIRRORSITE="http://deb.debian.org/$OSNAME/"}
COMPONENTS="main contrib non-free"
if $(echo "$STABLE_CODENAME stable" | grep -q $DIST); then
OTHERMIRROR="$OTHERMIRROR | deb $MIRRORSITE $STABLE_BACKPORTS_SUITE $COMPONENTS"
@ -125,8 +124,7 @@ elif $(echo ${UBUNTU_SUITES[@]} | grep -q $DIST); then
OSNAME=ubuntu
if [[ "$ARCH" == "amd64" || "$ARCH" == "i386" ]]; then
#MIRRORSITE=${SET_MIRRORSITE="http://archive.ubuntu.com/$OSNAME/"}
MIRRORSITE=${SET_MIRRORSITE="http://mirror.yandex.ru/$OSNAME/"}
MIRRORSITE=${SET_MIRRORSITE="http://archive.ubuntu.com/$OSNAME/"}
else
MIRRORSITE=${SET_MIRRORSITE="http://ports.ubuntu.com/ubuntu-ports/"}
fi

View File

@ -5,7 +5,7 @@
# Default-Stop: 0 1 6
# Should-Start: $time $network
# Should-Stop: $network
# Short-Description: Yandex clickhouse-server daemon
# Short-Description: clickhouse-server daemon
### END INIT INFO
#
# NOTES:

6
debian/control vendored
View File

@ -18,7 +18,7 @@ Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-common-static (= ${binar
Replaces: clickhouse-compressor
Conflicts: clickhouse-compressor
Description: Client binary for ClickHouse
Yandex ClickHouse is a column-oriented database management system
ClickHouse is a column-oriented database management system
that allows generating analytical data reports in real time.
.
This package provides clickhouse-client , clickhouse-local and clickhouse-benchmark
@ -30,7 +30,7 @@ Suggests: clickhouse-common-static-dbg
Replaces: clickhouse-common, clickhouse-server-base
Provides: clickhouse-common, clickhouse-server-base
Description: Common files for ClickHouse
Yandex ClickHouse is a column-oriented database management system
ClickHouse is a column-oriented database management system
that allows generating analytical data reports in real time.
.
This package provides common files for both clickhouse server and client
@ -42,7 +42,7 @@ Recommends: libcap2-bin
Replaces: clickhouse-server-common, clickhouse-server-base
Provides: clickhouse-server-common
Description: Server binary for ClickHouse
Yandex ClickHouse is a column-oriented database management system
ClickHouse is a column-oriented database management system
that allows generating analytical data reports in real time.
.
This package provides clickhouse common configuration files

View File

@ -3,14 +3,14 @@ toc_priority: 53
toc_title: USE
---
# USE Statement {#use}
# USE 语句 {#use}
``` sql
USE db
```
Lets you set the current database for the session.
用于设置会话的当前数据库。
The current database is used for searching for tables if the database is not explicitly defined in the query with a dot before the table name.
如果查询语句中没有在表名前面以加点的方式指明数据库名, 则用当前数据库进行搜索。
This query cant be made when using the HTTP protocol, since there is no concept of a session.
使用 HTTP 协议时无法进行此查询,因为没有会话的概念。

View File

@ -1018,6 +1018,9 @@ toc_title: '2021'
### ClickHouse release 21.6, 2021-06-05
#### Backward Incompatible Change
* uniqState / uniqHLL12State / uniqCombinedState / uniqCombined64State produce incompatible states with `UUID` type. [#33607](https://github.com/ClickHouse/ClickHouse/issues/33607).
#### Upgrade Notes
* `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour.

View File

@ -1,8 +1,32 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_folder_title: "\u8BED\u53E5"
toc_folder_title: SQL 语句
toc_hidden: true
toc_priority: 31
---
# ClickHouse SQL 语句 {#clickhouse-sql-statements}
语句表示可以使用 SQL 查询执行的各种操作。每种类型的语句都有自己的语法和用法详细信息,这些语法和用法详细信息单独描述如下所示:
- [SELECT](../../sql-reference/statements/select/index.md)
- [INSERT INTO](../../sql-reference/statements/insert-into.md)
- [CREATE](../../sql-reference/statements/create/index.md)
- [ALTER](../../sql-reference/statements/alter/index.md)
- [SYSTEM](../../sql-reference/statements/system.md)
- [SHOW](../../sql-reference/statements/show.md)
- [GRANT](../../sql-reference/statements/grant.md)
- [REVOKE](../../sql-reference/statements/revoke.md)
- [ATTACH](../../sql-reference/statements/attach.md)
- [CHECK TABLE](../../sql-reference/statements/check-table.md)
- [DESCRIBE TABLE](../../sql-reference/statements/describe-table.md)
- [DETACH](../../sql-reference/statements/detach.md)
- [DROP](../../sql-reference/statements/drop.md)
- [EXISTS](../../sql-reference/statements/exists.md)
- [KILL](../../sql-reference/statements/kill.md)
- [OPTIMIZE](../../sql-reference/statements/optimize.md)
- [RENAME](../../sql-reference/statements/rename.md)
- [SET](../../sql-reference/statements/set.md)
- [SET ROLE](../../sql-reference/statements/set-role.md)
- [TRUNCATE](../../sql-reference/statements/truncate.md)
- [USE](../../sql-reference/statements/use.md)
- [EXPLAIN](../../sql-reference/statements/explain.md)

View File

@ -28,6 +28,7 @@
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <base/ErrorHandlers.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>

View File

@ -584,7 +584,6 @@ if (ENABLE_TESTS AND USE_GTEST)
dbms
clickhouse_common_config
clickhouse_common_zookeeper
clickhouse_common_config
string_utils)
add_check(unit_tests_dbms)

View File

@ -8,7 +8,6 @@ set (SRCS
)
add_library(clickhouse_common_config ${SRCS})
target_link_libraries(clickhouse_common_config
PUBLIC
clickhouse_common_zookeeper
@ -18,9 +17,17 @@ target_link_libraries(clickhouse_common_config
string_utils
)
if (USE_YAML_CPP)
target_link_libraries(clickhouse_common_config
add_library(clickhouse_common_config_no_zookeeper_log ${SRCS})
target_link_libraries(clickhouse_common_config_no_zookeeper_log
PUBLIC
clickhouse_common_zookeeper_no_log
common
Poco::XML
PRIVATE
yaml-cpp
string_utils
)
if (USE_YAML_CPP)
target_link_libraries(clickhouse_common_config PRIVATE yaml-cpp)
target_link_libraries(clickhouse_common_config_no_zookeeper_log PRIVATE yaml-cpp)
endif()

View File

@ -15,6 +15,7 @@
#include <Common/formatReadable.h>
#include <Common/filesystemHelpers.h>
#include <Common/ErrorCodes.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <filesystem>
#include <Common/config_version.h>
@ -175,7 +176,7 @@ void tryLogCurrentException(const char * log_name, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
/// Poco::Logger::get can allocate memory too
tryLogCurrentExceptionImpl(&Poco::Logger::get(log_name), start_of_message);
@ -188,7 +189,7 @@ void tryLogCurrentException(Poco::Logger * logger, const std::string & start_of_
///
/// And in this case the exception will not be logged, so let's block the
/// MemoryTracker until the exception will be logged.
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
tryLogCurrentExceptionImpl(logger, start_of_message);
}

View File

@ -0,0 +1,20 @@
#include <Common/LockMemoryExceptionInThread.h>
/// LockMemoryExceptionInThread
thread_local uint64_t LockMemoryExceptionInThread::counter = 0;
thread_local VariableContext LockMemoryExceptionInThread::level = VariableContext::Global;
thread_local bool LockMemoryExceptionInThread::block_fault_injections = false;
LockMemoryExceptionInThread::LockMemoryExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
LockMemoryExceptionInThread::~LockMemoryExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Common/VariableContext.h>
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
/// - either configured memory limit reached
/// - or fault injected
///
/// So this will simply ignore the configured memory limit (and avoid fault injection).
///
/// NOTE: exception will be silently ignored, no message in log
/// (since logging from MemoryTracker::alloc() is tricky)
///
/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if
/// stack unwinding is currently in progress in this thread (to avoid
/// std::terminate()), so you don't need to use it in this case explicitly.
struct LockMemoryExceptionInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
explicit LockMemoryExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
~LockMemoryExceptionInThread();
LockMemoryExceptionInThread(const LockMemoryExceptionInThread &) = delete;
LockMemoryExceptionInThread & operator=(const LockMemoryExceptionInThread &) = delete;
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);
}
};

View File

@ -1,12 +1,14 @@
#include "MemoryTracker.h"
#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Interpreters/TraceCollector.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/formatReadable.h>
#include <base/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/thread_local_rng.h>
#include <base/logger_useful.h>
#include <atomic>
#include <cmath>
@ -34,7 +36,7 @@ namespace
/// noexcept(false)) will cause std::terminate()
bool inline memoryTrackerCanThrow(VariableContext level, bool fault_injection)
{
return !MemoryTracker::LockExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
return !LockMemoryExceptionInThread::isBlocked(level, fault_injection) && !std::uncaught_exceptions();
}
}
@ -55,41 +57,6 @@ namespace ProfileEvents
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
// BlockerInThread
thread_local uint64_t MemoryTracker::BlockerInThread::counter = 0;
thread_local VariableContext MemoryTracker::BlockerInThread::level = VariableContext::Global;
MemoryTracker::BlockerInThread::BlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTracker::BlockerInThread::~BlockerInThread()
{
--counter;
level = previous_level;
}
/// LockExceptionInThread
thread_local uint64_t MemoryTracker::LockExceptionInThread::counter = 0;
thread_local VariableContext MemoryTracker::LockExceptionInThread::level = VariableContext::Global;
thread_local bool MemoryTracker::LockExceptionInThread::block_fault_injections = false;
MemoryTracker::LockExceptionInThread::LockExceptionInThread(VariableContext level_, bool block_fault_injections_)
: previous_level(level)
, previous_block_fault_injections(block_fault_injections)
{
++counter;
level = level_;
block_fault_injections = block_fault_injections_;
}
MemoryTracker::LockExceptionInThread::~LockExceptionInThread()
{
--counter;
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
@ -133,9 +100,9 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
if (BlockerInThread::isBlocked(level))
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->allocImpl(size, throw_if_memory_exceeded);
return;
@ -184,7 +151,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true) && throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
@ -203,7 +170,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
bool allocation_traced = false;
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
allocation_traced = true;
@ -212,7 +179,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
allocation_traced = true;
}
@ -220,7 +187,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
throw DB::Exception(
@ -237,7 +204,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
bool log_memory_usage = true;
peak_updated = updatePeak(will_be, log_memory_usage);
}
@ -249,7 +216,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
if (peak_updated && allocation_traced)
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemoryPeak, StackTrace(), will_be);
}
@ -288,9 +255,9 @@ bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
void MemoryTracker::free(Int64 size)
{
if (BlockerInThread::isBlocked(level))
if (MemoryTrackerBlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
/// Since the MemoryTrackerBlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
return;
@ -299,7 +266,7 @@ void MemoryTracker::free(Int64 size)
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
}

View File

@ -31,6 +31,9 @@ extern thread_local bool memory_tracker_always_throw_logical_error_on_allocation
/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
*
* @see LockMemoryExceptionInThread
* @see MemoryTrackerBlockerInThread
*/
class MemoryTracker
{
@ -167,64 +170,6 @@ public:
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage() const;
/// To be able to temporarily stop memory tracking from current thread.
struct BlockerInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
/// level_ - block in level and above
explicit BlockerInThread(VariableContext level_ = VariableContext::User);
~BlockerInThread();
BlockerInThread(const BlockerInThread &) = delete;
BlockerInThread & operator=(const BlockerInThread &) = delete;
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};
/// To be able to avoid MEMORY_LIMIT_EXCEEDED Exception in destructors:
/// - either configured memory limit reached
/// - or fault injected
///
/// So this will simply ignore the configured memory limit (and avoid fault injection).
///
/// NOTE: exception will be silently ignored, no message in log
/// (since logging from MemoryTracker::alloc() is tricky)
///
/// NOTE: MEMORY_LIMIT_EXCEEDED Exception implicitly blocked if
/// stack unwinding is currently in progress in this thread (to avoid
/// std::terminate()), so you don't need to use it in this case explicitly.
struct LockExceptionInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
static thread_local bool block_fault_injections;
VariableContext previous_level;
bool previous_block_fault_injections;
public:
/// level_ - block in level and above
/// block_fault_injections_ - block in fault injection too
explicit LockExceptionInThread(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
~LockExceptionInThread();
LockExceptionInThread(const LockExceptionInThread &) = delete;
LockExceptionInThread & operator=(const LockExceptionInThread &) = delete;
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);
}
};
};
extern MemoryTracker total_memory_tracker;

View File

@ -0,0 +1,16 @@
#include <Common/MemoryTrackerBlockerInThread.h>
// MemoryTrackerBlockerInThread
thread_local uint64_t MemoryTrackerBlockerInThread::counter = 0;
thread_local VariableContext MemoryTrackerBlockerInThread::level = VariableContext::Global;
MemoryTrackerBlockerInThread::MemoryTrackerBlockerInThread(VariableContext level_)
: previous_level(level)
{
++counter;
level = level_;
}
MemoryTrackerBlockerInThread::~MemoryTrackerBlockerInThread()
{
--counter;
level = previous_level;
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Common/VariableContext.h>
/// To be able to temporarily stop memory tracking from current thread.
struct MemoryTrackerBlockerInThread
{
private:
static thread_local uint64_t counter;
static thread_local VariableContext level;
VariableContext previous_level;
public:
/// level_ - block in level and above
explicit MemoryTrackerBlockerInThread(VariableContext level_ = VariableContext::User);
~MemoryTrackerBlockerInThread();
MemoryTrackerBlockerInThread(const MemoryTrackerBlockerInThread &) = delete;
MemoryTrackerBlockerInThread & operator=(const MemoryTrackerBlockerInThread &) = delete;
static bool isBlocked(VariableContext current_level)
{
return counter > 0 && current_level >= level;
}
};

View File

@ -1,9 +1,9 @@
#include "QueryProfiler.h"
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceCollector.h>
#include <Common/Exception.h>
#include <Common/StackTrace.h>
#include <Common/TraceCollector.h>
#include <Common/thread_local_rng.h>
#include <base/logger_useful.h>
#include <base/phdr_cache.h>

View File

@ -4,6 +4,7 @@
#include <Common/ThreadStatus.h>
#include <base/errnoToString.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <base/getThreadId.h>
@ -11,6 +12,7 @@
#include <csignal>
#include <mutex>
#include <sys/mman.h>
namespace DB

View File

@ -1,187 +0,0 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/Exception.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/setThreadName.h>
#include <base/logger_useful.h>
namespace DB
{
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
///
/// And it cannot be large, since otherwise it will not fit into PIPE_BUF.
/// The performance test query ids can be surprisingly long like
/// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`,
/// so make some allowance for them as well.
constexpr size_t QUERY_ID_MAX_LEN = 128;
static_assert(QUERY_ID_MAX_LEN <= std::numeric_limits<uint8_t>::max());
}
LazyPipeFDs pipe;
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
: trace_log(std::move(trace_log_))
{
pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
pipe.setNonBlockingWrite();
pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
stop();
pipe.close();
}
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size)
{
constexpr size_t buf_size = sizeof(char) /// TraceCollector stop flag
+ sizeof(UInt8) /// String size
+ QUERY_ID_MAX_LEN /// Maximum query_id length
+ sizeof(UInt8) /// Number of stack frames
+ sizeof(StackTrace::FramePointers) /// Collected stack trace, maximum capacity
+ sizeof(TraceType) /// trace type
+ sizeof(UInt64) /// thread_id
+ sizeof(Int64); /// size
/// Write should be atomic to avoid overlaps
/// (since recursive collect() is possible)
static_assert(PIPE_BUF >= 512);
static_assert(buf_size <= 512, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512");
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id;
UInt64 thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}
writeChar(false, out); /// true if requested to stop the collecting thread.
writeBinary(static_cast<uint8_t>(query_id.size), out);
out.write(query_id.data, query_id.size);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFramePointers()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*/
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
writeChar(true, out);
out.next();
thread.join();
}
void TraceCollector::run()
{
setThreadName("TraceCollector");
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);
while (true)
{
char is_last;
readChar(is_last, in);
if (is_last)
break;
std::string query_id;
UInt8 query_id_size = 0;
readBinary(query_id_size, in);
query_id.resize(query_id_size);
in.read(query_id.data(), query_id_size);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(trace_size);
for (size_t i = 0; i < trace_size; ++i)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TraceType trace_type;
readPODBinary(trace_type, in);
UInt64 thread_id;
readPODBinary(thread_id, in);
Int64 size;
readPODBinary(size, in);
if (trace_log)
{
// time and time_in_microseconds are both being constructed from the same timespec so that the
// times will be equal up to the precision of a second.
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
UInt64 time = UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec);
UInt64 time_in_microseconds = UInt64((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size};
trace_log->add(element);
}
}
}
}

View File

@ -1,46 +0,0 @@
#pragma once
#include "Common/PipeFDs.h"
#include <Common/ThreadPool.h>
class StackTrace;
namespace Poco
{
class Logger;
}
namespace DB
{
class TraceLog;
enum class TraceType : uint8_t
{
Real,
CPU,
Memory,
MemorySample,
MemoryPeak,
};
class TraceCollector
{
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log_);
~TraceCollector();
/// Collect a stack trace. This method is signal safe.
/// Precondition: the TraceCollector object must be created.
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
static void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size);
private:
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
void stop();
};
}

View File

@ -0,0 +1,78 @@
#include <Common/TraceSender.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <IO/WriteHelpers.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
///
/// And it cannot be large, since otherwise it will not fit into PIPE_BUF.
/// The performance test query ids can be surprisingly long like
/// `aggregating_merge_tree_simple_aggregate_function_string.query100.profile100`,
/// so make some allowance for them as well.
constexpr size_t QUERY_ID_MAX_LEN = 128;
static_assert(QUERY_ID_MAX_LEN <= std::numeric_limits<uint8_t>::max());
}
namespace DB
{
LazyPipeFDs TraceSender::pipe;
void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Int64 size)
{
constexpr size_t buf_size = sizeof(char) /// TraceCollector stop flag
+ sizeof(UInt8) /// String size
+ QUERY_ID_MAX_LEN /// Maximum query_id length
+ sizeof(UInt8) /// Number of stack frames
+ sizeof(StackTrace::FramePointers) /// Collected stack trace, maximum capacity
+ sizeof(TraceType) /// trace type
+ sizeof(UInt64) /// thread_id
+ sizeof(Int64); /// size
/// Write should be atomic to avoid overlaps
/// (since recursive collect() is possible)
static_assert(PIPE_BUF >= 512);
static_assert(buf_size <= 512, "Only write of PIPE_BUF to pipe is atomic and the minimal known PIPE_BUF across supported platforms is 512");
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id;
UInt64 thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}
writeChar(false, out); /// true if requested to stop the collecting thread.
writeBinary(static_cast<uint8_t>(query_id.size), out);
out.write(query_id.data, query_id.size);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFramePointers()[i], out);
writePODBinary(trace_type, out);
writePODBinary(thread_id, out);
writePODBinary(size, out);
out.next();
}
}

36
src/Common/TraceSender.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include <Common/PipeFDs.h>
#include <base/types.h>
class StackTrace;
class TraceCollector;
namespace DB
{
enum class TraceType : uint8_t
{
Real,
CPU,
Memory,
MemorySample,
MemoryPeak,
};
/// This is the second part of TraceCollector, that sends stacktrace to the pipe.
/// It has been split out to avoid dependency from interpreters part.
class TraceSender
{
public:
/// Collect a stack trace. This method is signal safe.
/// Precondition: the TraceCollector object must be created.
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
static void send(TraceType trace_type, const StackTrace & stack_trace, Int64 size);
private:
friend class TraceCollector;
static LazyPipeFDs pipe;
};
}

View File

@ -2,9 +2,32 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_common_zookeeper .)
# for clickhouse server
add_library(clickhouse_common_zookeeper ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
target_compile_definitions (clickhouse_common_zookeeper PRIVATE -DZOOKEEPER_LOG)
target_link_libraries (clickhouse_common_zookeeper
PUBLIC
clickhouse_common_io
common
PRIVATE
string_utils
)
# To avoid circular dependency from interpreters.
if (OS_DARWIN)
target_link_libraries (clickhouse_common_zookeeper PRIVATE -Wl,-undefined,dynamic_lookup)
else()
target_link_libraries (clickhouse_common_zookeeper PRIVATE -Wl,--unresolved-symbols=ignore-all)
endif()
target_link_libraries (clickhouse_common_zookeeper PUBLIC clickhouse_common_io common PRIVATE string_utils)
# for examples -- no logging (to avoid extra dependencies)
add_library(clickhouse_common_zookeeper_no_log ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
target_link_libraries (clickhouse_common_zookeeper_no_log
PUBLIC
clickhouse_common_io
common
PRIVATE
string_utils
)
if (ENABLE_EXAMPLES)
add_subdirectory(examples)

View File

@ -1,5 +1,6 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>

View File

@ -1230,6 +1230,7 @@ void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
std::atomic_store(&zk_log, std::move(zk_log_));
}
#ifdef ZOOKEEPER_LOG
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
{
auto maybe_zk_log = std::atomic_load(&zk_log);
@ -1271,5 +1272,9 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
maybe_zk_log->add(elem);
}
}
#else
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr &, const ZooKeeperResponsePtr &, bool)
{}
#endif
}

View File

@ -0,0 +1,93 @@
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <filesystem>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace fs = std::filesystem;
namespace zkutil
{
ZooKeeperLock::ZooKeeperLock(
const ZooKeeperPtr & zookeeper_,
const std::string & lock_prefix_,
const std::string & lock_name_,
const std::string & lock_message_)
: zookeeper(zookeeper_)
, lock_path(fs::path(lock_prefix_) / lock_name_)
, lock_message(lock_message_)
, log(&Poco::Logger::get("zkutil::Lock"))
{
zookeeper->createIfNotExists(lock_prefix_, "");
}
ZooKeeperLock::~ZooKeeperLock()
{
try
{
unlock();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void ZooKeeperLock::unlock()
{
if (!locked)
return;
locked = false;
if (zookeeper->expired())
{
LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message);
return;
}
Coordination::Stat stat;
/// NOTE It will throw if session expired after we checked it above
bool result = zookeeper->exists(lock_path, &stat);
if (result && stat.ephemeralOwner == zookeeper->getClientID())
zookeeper->remove(lock_path, -1);
else if (result)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, it has another owner. Path: {}, message: {}, owner: {}, our id: {}",
lock_path, lock_message, stat.ephemeralOwner, zookeeper->getClientID());
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Lock is lost, node does not exist. Path: {}, message: {}", lock_path, lock_message);
}
bool ZooKeeperLock::tryLock()
{
Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZOK)
{
locked = true;
}
else if (code != Coordination::Error::ZNODEEXISTS)
{
throw Coordination::Exception(code);
}
return locked;
}
std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
{
return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message);
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <memory>
#include <string>
#include <base/logger_useful.h>
namespace zkutil
{
/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases,
* and highlights your poor understanding of distributed systems.
*
* It's only correct if all the operations that are performed under lock
* are atomically checking that the lock still holds
* or if we ensure that these operations will be undone if lock is lost
* (due to ZooKeeper session loss) that's very difficult to achieve.
*
* It's Ok if every operation that we perform under lock is actually operation in ZooKeeper.
*
* In 1% of cases when you can correctly use Lock, the logic is complex enough, so you don't need this class.
*
* TLDR: Don't use this code if you are not sure. We only have a few cases of it's usage.
*/
class ZooKeeperLock
{
public:
/// lock_prefix - path where the ephemeral lock node will be created
/// lock_name - the name of the ephemeral lock node
ZooKeeperLock(
const ZooKeeperPtr & zookeeper_,
const std::string & lock_prefix_,
const std::string & lock_name_,
const std::string & lock_message_ = "");
~ZooKeeperLock();
void unlock();
bool tryLock();
private:
zkutil::ZooKeeperPtr zookeeper;
std::string lock_path;
std::string lock_message;
Poco::Logger * log;
bool locked = false;
};
std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
const ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message);
}

View File

@ -1,14 +1,14 @@
add_executable(zkutil_test_commands zkutil_test_commands.cpp)
target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper)
target_link_libraries(zkutil_test_commands PRIVATE clickhouse_common_zookeeper_no_log)
add_executable(zkutil_test_commands_new_lib zkutil_test_commands_new_lib.cpp)
target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper string_utils)
target_link_libraries(zkutil_test_commands_new_lib PRIVATE clickhouse_common_zookeeper_no_log string_utils)
add_executable(zkutil_test_async zkutil_test_async.cpp)
target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper)
target_link_libraries(zkutil_test_async PRIVATE clickhouse_common_zookeeper_no_log)
add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect PRIVATE clickhouse_common_zookeeper clickhouse_common_config)
target_link_libraries (zk_many_watches_reconnect PRIVATE clickhouse_common_zookeeper_no_log clickhouse_common_config)
add_executable (zookeeper_impl zookeeper_impl.cpp)
target_link_libraries (zookeeper_impl PRIVATE clickhouse_common_zookeeper)
target_link_libraries (zookeeper_impl PRIVATE clickhouse_common_zookeeper_no_log)

View File

@ -9,6 +9,8 @@
#include <Common/getMaxFileDescriptorCount.h>
#include <Common/StringUtils/StringUtils.h>
#include <Coordination/Keeper4LWInfo.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <unistd.h>

View File

@ -1,16 +1,18 @@
#include <Coordination/KeeperStorage.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/setThreadName.h>
#include <mutex>
#include <functional>
#include <base/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
#include <sstream>
#include <iomanip>
#include <Common/hex.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Poco/SHA1Engine.h>
#include <Poco/Base64Encoder.h>
#include <boost/algorithm/string.hpp>
#include <Common/hex.h>
#include <sstream>
#include <iomanip>
#include <mutex>
#include <functional>
#include <base/logger_useful.h>
namespace DB
{

View File

@ -6,6 +6,7 @@
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/ACLMap.h>
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
#include <unordered_map>
#include <unordered_set>
#include <vector>

View File

@ -69,7 +69,7 @@ CacheDictionary<dictionary_key_type>::CacheDictionary(
, rnd_engine(randomSeed())
{
if (!source_ptr->supportsSelectiveLoad())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with CacheDictionary", full_name);
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with CacheDictionary", getFullName());
}
template <DictionaryKeyType dictionary_key_type>

View File

@ -28,7 +28,7 @@ DirectDictionary<dictionary_key_type>::DirectDictionary(
, source_ptr{std::move(source_ptr_)}
{
if (!source_ptr->supportsSelectiveLoad())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with DirectDictionary", full_name);
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: source cannot be used with DirectDictionary", getFullName());
}
template <DictionaryKeyType dictionary_key_type>

View File

@ -370,7 +370,7 @@ void FlatDictionary::loadData()
updateData();
if (configuration.require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", full_name);
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", getFullName());
}
void FlatDictionary::calculateBytesAllocated()
@ -478,7 +478,7 @@ void FlatDictionary::resize(Attribute & attribute, UInt64 key)
if (key >= configuration.max_array_size)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"{}: identifier should be less than {}",
full_name,
getFullName(),
toString(configuration.max_array_size));
auto & container = std::get<ContainerType<T>>(attribute.container);

View File

@ -695,7 +695,7 @@ void HashedArrayDictionary<dictionary_key_type>::loadData()
if (configuration.require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY,
"{}: dictionary source is empty and 'require_nonempty' property is set.",
full_name);
getFullName());
}
template <DictionaryKeyType dictionary_key_type>

View File

@ -583,7 +583,7 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
if (configuration.require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY,
"{}: dictionary source is empty and 'require_nonempty' property is set.",
full_name);
getFullName());
}
template <DictionaryKeyType dictionary_key_type, bool sparse>

View File

@ -54,11 +54,14 @@ class IDictionary : public IExternalLoadable
public:
explicit IDictionary(const StorageID & dictionary_id_)
: dictionary_id(dictionary_id_)
, full_name(dictionary_id.getInternalDictionaryName())
{
}
const std::string & getFullName() const{ return full_name; }
std::string getFullName() const
{
std::lock_guard lock{name_mutex};
return dictionary_id.getInternalDictionaryName();
}
StorageID getDictionaryID() const
{
@ -73,7 +76,11 @@ public:
dictionary_id = new_name;
}
const std::string & getLoadableName() const override final { return getFullName(); }
std::string getLoadableName() const override final
{
std::lock_guard lock{name_mutex};
return dictionary_id.getInternalDictionaryName();
}
/// Specifies that no database is used.
/// Sometimes we cannot simply use an empty string for that because an empty string is
@ -270,7 +277,6 @@ private:
mutable StorageID dictionary_id;
protected:
const String full_name;
String dictionary_comment;
};

View File

@ -336,7 +336,7 @@ void IPAddressDictionary::createAttributes()
if (attribute.is_nullable)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"{}: array or nullable attributes not supported for dictionary of type {}",
full_name,
getFullName(),
getTypeName());
attribute_index_by_name.emplace(attribute.name, attributes.size());
@ -345,7 +345,7 @@ void IPAddressDictionary::createAttributes()
if (attribute.hierarchical)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"{}: hierarchical attributes not supported for dictionary of type {}",
full_name,
getFullName(),
getTypeName());
}
};
@ -520,7 +520,7 @@ void IPAddressDictionary::loadData()
LOG_TRACE(logger, "{} ip records are read", ip_records.size());
if (require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", full_name);
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", getFullName());
}
template <typename T>
@ -781,7 +781,7 @@ const IPAddressDictionary::Attribute & IPAddressDictionary::getAttribute(const s
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: no such attribute '{}'", full_name, attribute_name);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: no such attribute '{}'", getFullName(), attribute_name);
return attributes[it->second];
}

View File

@ -16,6 +16,7 @@
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/MemorySanitizer.h>
#include <Common/CurrentMetrics.h>
#include <Common/HashTable/HashMap.h>
#include <IO/AIO.h>
#include <IO/BufferWithOwnMemory.h>

View File

@ -7,39 +7,38 @@
#include <type_traits>
#include <base/wide_integer_to_string.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeInterval.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/Native.h>
#include <DataTypes/NumberTraits.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include "Core/DecimalFunctions.h"
#include "IFunction.h"
#include "FunctionHelpers.h"
#include "IsOperation.h"
#include "DivisionUtils.h"
#include "castTypeToEither.h"
#include "FunctionFactory.h"
#include <Common/Arena.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Functions/DivisionUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/IsOperation.h>
#include <Functions/castTypeToEither.h>
#include <Interpreters/castColumn.h>
#include <base/TypeList.h>
#include <base/map.h>
#include <Common/config.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#if USE_EMBEDDED_COMPILER
# pragma GCC diagnostic push
@ -134,11 +133,11 @@ public:
Case<IsDataTypeDecimal<LeftDataType> && IsIntegralOrExtended<RightDataType>, LeftDataType>,
Case<IsDataTypeDecimal<RightDataType> && IsIntegralOrExtended<LeftDataType>, RightDataType>,
/// e.g Decimal * Float64 = Float64
Case<IsOperation<Operation>::multiply && IsDataTypeDecimal<LeftDataType> && IsFloatingPoint<RightDataType>,
RightDataType>,
Case<IsOperation<Operation>::multiply && IsDataTypeDecimal<RightDataType> && IsFloatingPoint<LeftDataType>,
LeftDataType>,
/// e.g Decimal +-*/ Float, least(Decimal, Float), greatest(Decimal, Float) = Float64
Case<IsOperation<Operation>::allow_decimal && IsDataTypeDecimal<LeftDataType> && IsFloatingPoint<RightDataType>,
DataTypeFloat64>,
Case<IsOperation<Operation>::allow_decimal && IsDataTypeDecimal<RightDataType> && IsFloatingPoint<LeftDataType>,
DataTypeFloat64>,
/// Decimal <op> Real is not supported (traditional DBs convert Decimal <op> Real to Real)
Case<IsDataTypeDecimal<LeftDataType> && !IsIntegralOrExtendedOrDecimal<RightDataType>, InvalidType>,
@ -959,25 +958,16 @@ class FunctionBinaryArithmetic : public IFunction
static constexpr const bool left_is_decimal = is_decimal<T0>;
static constexpr const bool right_is_decimal = is_decimal<T1>;
static constexpr const bool result_is_decimal = IsDataTypeDecimal<ResultDataType>;
typename ColVecResult::MutablePtr col_res = nullptr;
const ResultDataType type = [&]
{
if constexpr (left_is_decimal && IsFloatingPoint<RightDataType>)
return RightDataType();
else if constexpr (right_is_decimal && IsFloatingPoint<LeftDataType>)
return LeftDataType();
else
return decimalResultType<is_multiply, is_division>(left, right);
}();
const ResultDataType type = decimalResultType<is_multiply, is_division>(left, right);
const ResultType scale_a = [&]
{
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
return right.getScaleMultiplier(); // the division impl uses only the scale_a
else if constexpr (result_is_decimal)
else
{
if constexpr (is_multiply)
// the decimal impl uses scales, but if the result is decimal, both of the arguments are decimal,
@ -988,37 +978,14 @@ class FunctionBinaryArithmetic : public IFunction
else
return type.scaleFactorFor(left, false);
}
else if constexpr (left_is_decimal)
{
if (col_left_const)
// the column will be converted to native type later, no need to scale it twice.
// the explicit type is needed to specify lambda return type
return ResultType{1};
return 1 / DecimalUtils::convertTo<ResultType>(left.getScaleMultiplier(), 0);
}
else
return 1; // the default value which won't cause any re-scale
}();
const ResultType scale_b = [&]
{
if constexpr (result_is_decimal)
{
if constexpr (is_multiply)
return ResultType{1};
else
return type.scaleFactorFor(right, is_division);
}
else if constexpr (right_is_decimal)
{
if (col_right_const)
return ResultType{1};
return 1 / DecimalUtils::convertTo<ResultType>(right.getScaleMultiplier(), 0);
}
else
return 1;
}();
/// non-vector result
@ -1029,20 +996,15 @@ class FunctionBinaryArithmetic : public IFunction
ResultType res = {};
if (!right_nullmap || !(*right_nullmap)[0])
res = check_decimal_overflow ? OpImplCheck::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b)
res = check_decimal_overflow
? OpImplCheck::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b)
: OpImpl::template process<left_is_decimal, right_is_decimal>(const_a, const_b, scale_a, scale_b);
if constexpr (result_is_decimal)
return ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
col_left_const->size(), toField(res, type.getScale()));
else
return ResultDataType().createColumnConst(col_left_const->size(), toField(res));
return ResultDataType(type.getPrecision(), type.getScale())
.createColumnConst(col_left_const->size(), toField(res, type.getScale()));
}
if constexpr (result_is_decimal)
col_res = ColVecResult::create(0, type.getScale());
else
col_res = ColVecResult::create(0);
col_res = ColVecResult::create(0, type.getScale());
auto & vec_res = col_res->getData();
vec_res.resize(col_left_size);
@ -1219,8 +1181,7 @@ public:
}
else if constexpr ((IsDataTypeDecimal<LeftDataType> && IsFloatingPoint<RightDataType>) ||
(IsDataTypeDecimal<RightDataType> && IsFloatingPoint<LeftDataType>))
type_res = std::make_shared<std::conditional_t<IsFloatingPoint<LeftDataType>,
LeftDataType, RightDataType>>();
type_res = std::make_shared<DataTypeFloat64>();
else if constexpr (IsDataTypeDecimal<LeftDataType>)
type_res = std::make_shared<LeftDataType>(left.getPrecision(), left.getScale());
else if constexpr (IsDataTypeDecimal<RightDataType>)
@ -1453,15 +1414,33 @@ public:
else // we can't avoid the else because otherwise the compiler may assume the ResultDataType may be Invalid
// and that would produce the compile error.
{
using T0 = typename LeftDataType::FieldType;
using T1 = typename RightDataType::FieldType;
constexpr bool decimal_with_float = (IsDataTypeDecimal<LeftDataType> && IsFloatingPoint<RightDataType>)
|| (IsFloatingPoint<LeftDataType> && IsDataTypeDecimal<RightDataType>);
using T0 = std::conditional_t<decimal_with_float, Float64, typename LeftDataType::FieldType>;
using T1 = std::conditional_t<decimal_with_float, Float64, typename RightDataType::FieldType>;
using ResultType = typename ResultDataType::FieldType;
using ColVecT0 = ColumnVectorOrDecimal<T0>;
using ColVecT1 = ColumnVectorOrDecimal<T1>;
using ColVecResult = ColumnVectorOrDecimal<ResultType>;
const auto * const col_left_raw = arguments[0].column.get();
const auto * const col_right_raw = arguments[1].column.get();
ColumnPtr left_col = nullptr;
ColumnPtr right_col = nullptr;
/// When Decimal op Float32/64, convert both of them into Float64
if constexpr (decimal_with_float)
{
const auto converted_type = std::make_shared<DataTypeFloat64>();
left_col = castColumn(arguments[0], converted_type);
right_col = castColumn(arguments[1], converted_type);
}
else
{
left_col = arguments[0].column;
right_col = arguments[1].column;
}
const auto * const col_left_raw = left_col.get();
const auto * const col_right_raw = right_col.get();
const size_t col_left_size = col_left_raw->size();
@ -1471,7 +1450,7 @@ public:
const ColVecT0 * const col_left = checkAndGetColumn<ColVecT0>(col_left_raw);
const ColVecT1 * const col_right = checkAndGetColumn<ColVecT1>(col_right_raw);
if constexpr (IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>)
if constexpr (IsDataTypeDecimal<ResultDataType>)
{
return executeNumericWithDecimal<LeftDataType, RightDataType, ResultDataType>(
left, right,
@ -1525,11 +1504,7 @@ public:
const T1 value = col_right_const->template getValue<T1>();
OpImpl::template process<OpCase::RightConstant>(
col_left->getData().data(),
&value,
vec_res.data(),
vec_res.size(),
right_nullmap);
col_left->getData().data(), &value, vec_res.data(), vec_res.size(), right_nullmap);
}
else
return nullptr;

View File

@ -57,10 +57,7 @@ struct IsOperation
static constexpr bool division = div_floating || div_int || div_int_or_zero;
static constexpr bool allow_decimal =
plus || minus || multiply ||
div_floating || div_int || div_int_or_zero ||
least || greatest;
static constexpr bool allow_decimal = plus || minus || multiply || division || least || greatest;
};
}

View File

@ -8,7 +8,7 @@
#include <string.h>
#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <IO/BufferBase.h>
@ -116,7 +116,7 @@ public:
return;
/// finalize() is often called from destructors.
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
LockMemoryExceptionInThread lock(VariableContext::Global);
try
{
finalizeImpl();

View File

@ -3,6 +3,8 @@
#include <Interpreters/SystemLog.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <vector>
#include <atomic>

View File

@ -61,7 +61,7 @@ public:
const ExternalLoadableLifetime & getLifetime() const override;
const std::string & getLoadableName() const override { return name; }
std::string getLoadableName() const override { return name; }
bool supportUpdates() const override { return true; }

View File

@ -63,6 +63,7 @@
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/Session.h>
#include <Interpreters/TraceCollector.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h>
@ -74,7 +75,6 @@
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ShellCommand.h>
#include <Common/TraceCollector.h>
#include <base/logger_useful.h>
#include <base/EnumReflection.h>
#include <Common/RemoteHostFilter.h>

View File

@ -1,6 +1,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
/// Call this function on crash.

View File

@ -21,6 +21,7 @@
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
@ -54,113 +55,6 @@ namespace ErrorCodes
constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed";
/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases,
* and highlights your poor understanding of distributed systems.
*
* It's only correct if all the operations that are performed under lock
* are atomically checking that the lock still holds
* or if we ensure that these operations will be undone if lock is lost
* (due to ZooKeeper session loss) that's very difficult to achieve.
*
* It's Ok if every operation that we perform under lock is actually operation in ZooKeeper.
*
* In 1% of cases when you can correctly use Lock, the logic is complex enough, so you don't need this class.
*
* TLDR: Don't use this code.
* We only have a few cases of it's usage and it will be removed.
*/
class ZooKeeperLock
{
public:
/// lock_prefix - path where the ephemeral lock node will be created
/// lock_name - the name of the ephemeral lock node
ZooKeeperLock(
const zkutil::ZooKeeperPtr & zookeeper_,
const std::string & lock_prefix_,
const std::string & lock_name_,
const std::string & lock_message_ = "")
:
zookeeper(zookeeper_),
lock_path(fs::path(lock_prefix_) / lock_name_),
lock_message(lock_message_),
log(&Poco::Logger::get("zkutil::Lock"))
{
zookeeper->createIfNotExists(lock_prefix_, "");
}
~ZooKeeperLock()
{
try
{
unlock();
}
catch (...)
{
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void unlock()
{
if (!locked)
return;
locked = false;
if (zookeeper->expired())
{
LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message);
return;
}
Coordination::Stat stat;
std::string dummy;
/// NOTE It will throw if session expired after we checked it above
bool result = zookeeper->tryGet(lock_path, dummy, &stat);
if (result && stat.ephemeralOwner == zookeeper->getClientID())
zookeeper->remove(lock_path, -1);
else if (result)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Lock is lost, it has another owner. Path: {}, message: {}, owner: {}, our id: {}",
lock_path, lock_message, stat.ephemeralOwner, zookeeper->getClientID());
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Lock is lost, node does not exist. Path: {}, message: {}", lock_path, lock_message);
}
bool tryLock()
{
std::string dummy;
Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy);
if (code == Coordination::Error::ZOK)
{
locked = true;
}
else if (code != Coordination::Error::ZNODEEXISTS)
{
throw Coordination::Exception(code);
}
return locked;
}
private:
zkutil::ZooKeeperPtr zookeeper;
std::string lock_path;
std::string lock_message;
Poco::Logger * log;
bool locked = false;
};
std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
const zkutil::ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
{
return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message);
}
DDLWorker::DDLWorker(
int pool_size_,
const std::string & zk_root_dir,
@ -656,7 +550,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
/// We must hold the lock until task execution status is committed to ZooKeeper,
/// otherwise another replica may try to execute query again.
std::unique_ptr<ZooKeeperLock> execute_on_leader_lock;
std::unique_ptr<zkutil::ZooKeeperLock> execute_on_leader_lock;
/// Step 2: Execute query from the task.
if (!task.was_executed)
@ -776,7 +670,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
const String & rewritten_query,
const String & /*node_path*/,
const ZooKeeperPtr & zookeeper,
std::unique_ptr<ZooKeeperLock> & execute_on_leader_lock)
std::unique_ptr<zkutil::ZooKeeperLock> & execute_on_leader_lock)
{
StorageReplicatedMergeTree * replicated_storage = dynamic_cast<StorageReplicatedMergeTree *>(storage.get());

View File

@ -30,6 +30,11 @@ namespace Coordination
struct Stat;
}
namespace zkutil
{
class ZooKeeperLock;
}
namespace DB
{
class ASTAlterQuery;
@ -38,7 +43,6 @@ struct DDLTaskBase;
using DDLTaskPtr = std::unique_ptr<DDLTaskBase>;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class AccessRightsElements;
class ZooKeeperLock;
class DDLWorker
{
@ -95,7 +99,7 @@ protected:
const String & rewritten_query,
const String & node_path,
const ZooKeeperPtr & zookeeper,
std::unique_ptr<ZooKeeperLock> & execute_on_leader_lock);
std::unique_ptr<zkutil::ZooKeeperLock> & execute_on_leader_lock);
bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper);

View File

@ -37,7 +37,7 @@ public:
virtual const ExternalLoadableLifetime & getLifetime() const = 0;
virtual const std::string & getLoadableName() const = 0;
virtual std::string getLoadableName() const = 0;
/// True if object can be updated when lifetime exceeded.
virtual bool supportUpdates() const = 0;
/// If lifetime exceeded and isModified(), ExternalLoader replace current object with the result of clone().

View File

@ -1,5 +1,6 @@
#include <Interpreters/IInterpreter.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -3,6 +3,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{

View File

@ -13,6 +13,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/ASTIdentifier_fwd.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Storages/AlterCommands.h>
#include <Storages/IStorage.h>
#include <Storages/LiveView/LiveViewCommands.h>

View File

@ -39,6 +39,7 @@
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Access/Common/AccessRightsElement.h>

View File

@ -6,6 +6,7 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/TableOverrideUtils.h>
#include <Formats/FormatFactory.h>
@ -15,6 +16,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/StorageView.h>
#include <Processors/QueryPlan/QueryPlan.h>

View File

@ -16,6 +16,7 @@ limitations under the License. */
#include <Interpreters/Context.h>
#include <Access/Common/AccessFlags.h>
#include <QueryPipeline/StreamLocalLimits.h>
#include <Storages/IStorage.h>
namespace DB

View File

@ -15,7 +15,7 @@ limitations under the License. */
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB

View File

@ -3,6 +3,8 @@
#include <Interpreters/SystemLog.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <vector>
#include <atomic>

View File

@ -8,8 +8,10 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <Interpreters/Context.h>
#include <Common/hex.h>
#include <Common/CurrentThread.h>
namespace DB

View File

@ -1,6 +1,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace DB
{

View File

@ -1,6 +1,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace DB

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Core/Settings.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>

View File

@ -2,6 +2,8 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace ProfileEvents

View File

@ -9,6 +9,8 @@
#include <Core/SettingsEnums.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Interpreters/SystemLog.h>
#include <base/types.h>

View File

@ -21,6 +21,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Access/SettingsProfilesInfo.h>
#include <Interpreters/Context.h>
#include <cassert>

View File

@ -3,6 +3,9 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Access/Common/AuthenticationData.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Columns/IColumn.h>
namespace DB
{

View File

@ -10,17 +10,38 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/IStorage.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/WriteHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <base/logger_useful.h>
#include <base/scope_guard.h>
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
namespace
@ -96,6 +117,9 @@ std::shared_ptr<TSystemLog> createSystemLog(
}
///
/// ISystemLog
///
ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextPtr context)
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
@ -111,7 +135,40 @@ ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextP
return old_ast;
}
void ISystemLog::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
saving_thread.join();
}
void ISystemLog::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
///
/// SystemLogs
///
SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config)
{
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");
@ -193,4 +250,392 @@ void SystemLogs::shutdown()
log->shutdown();
}
///
/// SystemLog
///
template <typename LogElement>
SystemLog<LogElement>::SystemLog(
ContextPtr context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: WithContext(context_)
, table_id(database_name_, table_name_)
, storage_def(storage_def_)
, create_query(serializeAST(*getCreateTableQuery()))
, flush_interval_milliseconds(flush_interval_milliseconds_)
{
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
static thread_local bool recursive_add_call = false;
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_add_call)
return;
recursive_add_call = true;
SCOPE_EXIT({ recursive_add_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
{
queue_is_half_full = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
}
return;
}
queue.push_back(element);
}
if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
}
template <typename LogElement>
void SystemLog<LogElement>::shutdown()
{
stopFlushThread();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
table->flushAndShutdown();
}
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.",
ErrorCodes::TIMEOUT_EXCEEDED);
}
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()
{
setThreadName("SystemLogFlush");
std::vector<LogElement> to_flush;
bool exit_this_thread = false;
while (!exit_this_thread)
{
try
{
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
queue_front_index += queue.size();
to_flush_end = queue_front_index;
// Swap with existing array from previous flush, to save memory
// allocations.
to_flush.resize(0);
queue.swap(to_flush);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
}
if (to_flush.empty())
{
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
std::lock_guard lock(mutex);
is_force_prepare_tables = false;
flush_event.notify_all();
}
}
else
{
flushImpl(to_flush, to_flush_end);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_TRACE(log, "Terminating");
}
template <typename LogElement>
void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end)
{
try
{
LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}",
to_flush.size(), to_flush_end);
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
/// (new empty table will be created automatically). BTW, flush method
/// is called from single thread.
prepareTable();
ColumnsWithTypeAndName log_element_columns;
auto log_element_names_and_types = LogElement::getNamesAndTypes();
for (const auto & name_and_type : log_element_names_and_types)
log_element_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(log_element_columns));
MutableColumns columns = block.mutateColumns();
for (const auto & elem : to_flush)
elem.appendToBlock(columns);
block.setColumns(std::move(columns));
/// We write to table indirectly, using InterpreterInsertQuery.
/// This is needed to support DEFAULT-columns in table.
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->table_id = table_id;
ASTPtr query_ptr(insert.release());
// we need query context to do inserts to target table with MV containing subqueries or joins
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
InterpreterInsertQuery interpreter(query_ptr, insert_context);
BlockIO io = interpreter.execute();
PushingPipelineExecutor executor(io.pipeline);
executor.start();
executor.push(block);
executor.finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::lock_guard lock(mutex);
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
}
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
String description = table_id.getNameForLogs();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
{
if (old_create_query.empty())
{
old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext()));
if (old_create_query.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name));
}
if (old_create_query != create_query)
{
/// Rename the existing table.
int suffix = 0;
while (DatabaseCatalog::instance().isTableExist(
{table_id.database_name, table_id.table_name + "_" + toString(suffix)}, getContext()))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = table_id.database_name;
from.table = table_id.table_name;
ASTRenameQuery::Table to;
to.database = table_id.database_name;
to.table = table_id.table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(
log,
"Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.",
description,
backQuoteIfNeed(to.table),
old_create_query,
create_query);
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
InterpreterRenameQuery(rename, query_context).execute();
/// The required table will be created.
table = nullptr;
}
else if (!is_prepared)
LOG_DEBUG(log, "Will use existing table {} for {}", description, LogElement::name());
}
if (!table)
{
/// Create the table.
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
auto create_query_ast = getCreateTableQuery();
InterpreterCreateQuery interpreter(create_query_ast, query_context);
interpreter.setInternal(true);
interpreter.execute();
table = DatabaseCatalog::instance().getTable(table_id, getContext());
old_create_query.clear();
}
is_prepared = true;
}
template <typename LogElement>
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
{
auto create = std::make_shared<ASTCreateQuery>();
create->setDatabase(table_id.database_name);
create->setTable(table_id.table_name);
auto ordinary_columns = LogElement::getNamesAndTypes();
auto alias_columns = LogElement::getNamesAndAliases();
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
create->set(create->storage, storage_ast);
return create;
}
template class SystemLog<AsynchronousMetricLogElement>;
template class SystemLog<CrashLogElement>;
template class SystemLog<MetricLogElement>;
template class SystemLog<OpenTelemetrySpanLogElement>;
template class SystemLog<PartLogElement>;
template class SystemLog<QueryLogElement>;
template class SystemLog<QueryThreadLogElement>;
template class SystemLog<QueryViewsLogElement>;
template class SystemLog<SessionLogElement>;
template class SystemLog<TraceLogElement>;
template class SystemLog<ZooKeeperLogElement>;
template class SystemLog<TextLogElement>;
}

View File

@ -4,32 +4,27 @@
#include <atomic>
#include <memory>
#include <vector>
#include <condition_variable>
#include <boost/noncopyable.hpp>
#include <base/logger_useful.h>
#include <base/scope_guard.h>
#include <base/types.h>
#include <Core/Defines.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Common/setThreadName.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/StorageID.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ThreadPool.h>
#include <IO/WriteHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace Poco
{
class Logger;
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
@ -58,14 +53,6 @@ namespace DB
};
*/
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
class QueryLog;
class QueryThreadLog;
class PartLog;
@ -87,15 +74,33 @@ public:
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
/// Start the background thread.
virtual void startup();
/// Stop the background flush thread before destructor. No more data will be written.
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
virtual void savingThreadFunction() = 0;
/// returns CREATE TABLE query, but with removed:
/// - UUID
/// - SETTINGS (for MergeTree)
/// That way it can be used to compare with the SystemLog::getCreateTableQuery()
static ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context);
protected:
ThreadFromGlobalPool saving_thread;
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
bool is_shutdown = false;
std::condition_variable flush_event;
void stopFlushThread();
};
@ -156,23 +161,10 @@ public:
*/
void add(const LogElement & element);
void stopFlushThread();
void shutdown() override;
/// Flush data in the buffer to disk
void flush(bool force = false) override;
/// Start the background thread.
void startup() override;
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown() override
{
stopFlushThread();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
table->flushAndShutdown();
}
void flush(bool force) override;
String getName() override
{
@ -192,10 +184,7 @@ private:
String old_create_query;
bool is_prepared = false;
const size_t flush_interval_milliseconds;
ThreadFromGlobalPool saving_thread;
/* Data shared between callers of add()/flush()/shutdown(), and the saving thread */
std::mutex mutex;
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
@ -203,10 +192,8 @@ private:
// can wait until a particular message is flushed. This is used to implement
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
bool is_shutdown = false;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
std::condition_variable flush_event;
// Requested to flush logs up to this index, exclusive
uint64_t requested_flush_up_to = 0;
// Flushed log up to this index, exclusive
@ -214,7 +201,7 @@ private:
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
void savingThreadFunction();
void savingThreadFunction() override;
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
@ -226,403 +213,4 @@ private:
void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end);
};
template <typename LogElement>
SystemLog<LogElement>::SystemLog(
ContextPtr context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: WithContext(context_)
, table_id(database_name_, table_name_)
, storage_def(storage_def_)
, create_query(serializeAST(*getCreateTableQuery()))
, flush_interval_milliseconds(flush_interval_milliseconds_)
{
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
log = &Poco::Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
template <typename LogElement>
void SystemLog<LogElement>::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
static thread_local bool recursive_add_call = false;
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
if (recursive_add_call)
return;
recursive_add_call = true;
SCOPE_EXIT({ recursive_add_call = false; });
/// Memory can be allocated while resizing on queue.push_back.
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
{
queue_is_half_full = true;
// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
}
return;
}
queue.push_back(element);
}
if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
}
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
throw Exception("Timeout exceeded (" + toString(timeout_seconds) + " s) while flushing system log '" + demangle(typeid(*this).name()) + "'.",
ErrorCodes::TIMEOUT_EXCEEDED);
}
}
template <typename LogElement>
void SystemLog<LogElement>::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
/// Tell thread to shutdown.
flush_event.notify_all();
}
saving_thread.join();
}
template <typename LogElement>
void SystemLog<LogElement>::savingThreadFunction()
{
setThreadName("SystemLogFlush");
std::vector<LogElement> to_flush;
bool exit_this_thread = false;
while (!exit_this_thread)
{
try
{
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
queue_front_index += queue.size();
to_flush_end = queue_front_index;
// Swap with existing array from previous flush, to save memory
// allocations.
to_flush.resize(0);
queue.swap(to_flush);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
}
if (to_flush.empty())
{
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
std::lock_guard lock(mutex);
is_force_prepare_tables = false;
flush_event.notify_all();
}
}
else
{
flushImpl(to_flush, to_flush_end);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_TRACE(log, "Terminating");
}
template <typename LogElement>
void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end)
{
try
{
LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}",
to_flush.size(), to_flush_end);
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
/// (new empty table will be created automatically). BTW, flush method
/// is called from single thread.
prepareTable();
ColumnsWithTypeAndName log_element_columns;
auto log_element_names_and_types = LogElement::getNamesAndTypes();
for (auto name_and_type : log_element_names_and_types)
log_element_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(log_element_columns));
MutableColumns columns = block.mutateColumns();
for (const auto & elem : to_flush)
elem.appendToBlock(columns);
block.setColumns(std::move(columns));
/// We write to table indirectly, using InterpreterInsertQuery.
/// This is needed to support DEFAULT-columns in table.
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->table_id = table_id;
ASTPtr query_ptr(insert.release());
// we need query context to do inserts to target table with MV containing subqueries or joins
auto insert_context = Context::createCopy(context);
insert_context->makeQueryContext();
InterpreterInsertQuery interpreter(query_ptr, insert_context);
BlockIO io = interpreter.execute();
PushingPipelineExecutor executor(io.pipeline);
executor.start();
executor.push(block);
executor.finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
{
std::lock_guard lock(mutex);
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
}
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
String description = table_id.getNameForLogs();
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
{
if (old_create_query.empty())
{
old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext()));
if (old_create_query.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name));
}
if (old_create_query != create_query)
{
/// Rename the existing table.
int suffix = 0;
while (DatabaseCatalog::instance().isTableExist(
{table_id.database_name, table_id.table_name + "_" + toString(suffix)}, getContext()))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = table_id.database_name;
from.table = table_id.table_name;
ASTRenameQuery::Table to;
to.database = table_id.database_name;
to.table = table_id.table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(
log,
"Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.",
description,
backQuoteIfNeed(to.table),
old_create_query,
create_query);
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
InterpreterRenameQuery(rename, query_context).execute();
/// The required table will be created.
table = nullptr;
}
else if (!is_prepared)
LOG_DEBUG(log, "Will use existing table {} for {}", description, LogElement::name());
}
if (!table)
{
/// Create the table.
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
auto create_query_ast = getCreateTableQuery();
InterpreterCreateQuery interpreter(create_query_ast, query_context);
interpreter.setInternal(true);
interpreter.execute();
table = DatabaseCatalog::instance().getTable(table_id, getContext());
old_create_query.clear();
}
is_prepared = true;
}
template <typename LogElement>
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
{
auto create = std::make_shared<ASTCreateQuery>();
create->setDatabase(table_id.database_name);
create->setTable(table_id.table_name);
auto ordinary_columns = LogElement::getNamesAndTypes();
auto alias_columns = LogElement::getNamesAndAliases();
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
create->set(create->storage, storage_ast);
return create;
}
}

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <base/logger_useful.h>
#include <array>

View File

@ -1,5 +1,9 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Poco/Message.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/TraceCollector.h>
#include <Parsers/formatAST.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
@ -14,7 +15,8 @@
#include <Common/QueryProfiler.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/TraceCollector.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/errnoToString.h>
#if defined(OS_LINUX)
@ -341,7 +343,7 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
MemoryTracker::LockExceptionInThread lock(VariableContext::Global);
LockMemoryExceptionInThread lock(VariableContext::Global);
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{

View File

@ -0,0 +1,114 @@
#include "TraceCollector.h"
#include <Core/Field.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Poco/Logger.h>
#include <Common/setThreadName.h>
#include <base/logger_useful.h>
namespace DB
{
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
: trace_log(std::move(trace_log_))
{
TraceSender::pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
TraceSender::pipe.setNonBlockingWrite();
TraceSender::pipe.tryIncreaseSize(1 << 20);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
TraceCollector::~TraceCollector()
{
if (!thread.joinable())
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
else
stop();
TraceSender::pipe.close();
}
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*/
void TraceCollector::stop()
{
WriteBufferFromFileDescriptor out(TraceSender::pipe.fds_rw[1]);
writeChar(true, out);
out.next();
thread.join();
}
void TraceCollector::run()
{
setThreadName("TraceCollector");
ReadBufferFromFileDescriptor in(TraceSender::pipe.fds_rw[0]);
while (true)
{
char is_last;
readChar(is_last, in);
if (is_last)
break;
std::string query_id;
UInt8 query_id_size = 0;
readBinary(query_id_size, in);
query_id.resize(query_id_size);
in.read(query_id.data(), query_id_size);
UInt8 trace_size = 0;
readIntBinary(trace_size, in);
Array trace;
trace.reserve(trace_size);
for (size_t i = 0; i < trace_size; ++i)
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TraceType trace_type;
readPODBinary(trace_type, in);
UInt64 thread_id;
readPODBinary(thread_id, in);
Int64 size;
readPODBinary(size, in);
if (trace_log)
{
// time and time_in_microseconds are both being constructed from the same timespec so that the
// times will be equal up to the precision of a second.
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
UInt64 time = UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec);
UInt64 time_in_microseconds = UInt64((ts.tv_sec * 1000000LL) + (ts.tv_nsec / 1000));
TraceLogElement element{time_t(time / 1000000000), time_in_microseconds, time, trace_type, thread_id, query_id, trace, size};
trace_log->add(element);
}
}
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/TraceSender.h>
class StackTrace;
namespace Poco
{
class Logger;
}
namespace DB
{
class TraceLog;
class TraceCollector
{
public:
TraceCollector(std::shared_ptr<TraceLog> trace_log_);
~TraceCollector();
static inline void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size)
{
return TraceSender::send(trace_type, stack_trace, size);
}
private:
std::shared_ptr<TraceLog> trace_log;
ThreadFromGlobalPool thread;
void run();
void stop();
};
}

View File

@ -3,8 +3,10 @@
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/TraceCollector.h>
#include <Common/QueryProfiler.h>
#include <Common/TraceCollector.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace DB

View File

@ -33,7 +33,7 @@ public:
return lifetime;
}
const std::string & getLoadableName() const override
std::string getLoadableName() const override
{
return configuration.name;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>

View File

@ -2,6 +2,7 @@
#include <Common/PODArray.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <IO/WriteBufferFromFile.h>
@ -29,6 +30,7 @@
#include <Parsers/ParserQuery.h>
#include <Parsers/queryNormalization.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Formats/FormatFactory.h>
#include <Storages/StorageInput.h>
@ -38,6 +40,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ProcessList.h>
@ -193,7 +196,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
{
/// Disable memory tracker for stack trace.
/// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
try
{

View File

@ -15,10 +15,13 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/CurrentMetrics.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <base/JSON.h>
#include <base/logger_useful.h>
#include <Compression/getCompressionCodecForFile.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionElementParsers.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -615,7 +618,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
/// Memory should not be limited during ATTACH TABLE query.
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
/// Motivation: memory for index is shared between queries - not belong to the query itself.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
loadUUID();
loadColumns(require_columns_checksums);

View File

@ -2,8 +2,9 @@
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Common/CurrentMetrics.h>
#include <base/getThreadId.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
#include <base/getThreadId.h>
namespace DB

View File

@ -5516,6 +5516,9 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
{
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
const auto settings = getSettings();
bool result = true;
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
@ -5535,8 +5538,41 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
try
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
/// If zero-copy replication enabled than replicas shouldn't try to
/// move parts to another disk simultaneously. For this purpose we
/// use shared lock across replicas. NOTE: it's not 100% reliable,
/// because we are not checking lock while finishing part move.
/// However it's not dangerous at all, we will just have very rare
/// copies of some part.
///
/// FIXME: this code is related to Replicated merge tree, and not
/// common for ordinary merge tree. So it's a bad design and should
/// be fixed.
auto disk = moving_part.reserved_space->getDisk();
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// If we acuqired lock than let's try to move. After one
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part, disk); lock)
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
}
else
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
result = false;
continue;
}
}
else /// Ordinary move as it should be
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
}
write_part_log({});
}
catch (...)
@ -5548,7 +5584,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
throw;
}
}
return true;
return result;
}
bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right)

View File

@ -27,6 +27,8 @@
#include <Interpreters/Aggregator.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -43,6 +45,7 @@ class MergeTreeDataMergerMutator;
class MutationCommands;
class Context;
struct JobAndPool;
struct ZeroCopyLock;
/// Auxiliary struct holding information about the future merged or mutated part.
struct EmergingPartInfo
@ -1189,6 +1192,10 @@ private:
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal,
DataPartsLock & part_lock);
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const DataPartPtr &, const DiskPtr &) { return std::nullopt; }
};
/// RAII struct to record big parts that are submerging or emerging.

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <utility>
@ -184,7 +185,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (const auto & granule : granules_to_write)

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/ReadBufferFromFile.h>
#include <utility>
@ -47,7 +48,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
{
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
size_t file_size = disk->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);

View File

@ -200,7 +200,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
auto settings = data->getSettings();
auto part = moving_part.part;
auto disk = moving_part.reserved_space->getDisk();
LOG_DEBUG(log, "Cloning part {} from {} to {}", part->name, part->volume->getDisk()->getName(), disk->getName());
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->volume->getDisk()->getName(), disk->getName());
const String directory_to_move = "moving";
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)

View File

@ -1,13 +1,14 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
#include <Parsers/formatAST.h>
namespace DB

View File

@ -0,0 +1,9 @@
#include "ZeroCopyLock.h"
namespace DB
{
ZeroCopyLock::ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path)
: lock(zkutil::createSimpleZooKeeperLock(zookeeper, lock_path, "part_exclusive_lock", ""))
{
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <Core/Types.h>
#include <optional>
#include <memory>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
/// Very simple wrapper for zookeeper ephemeral lock. It's better to have it
/// because due to bad abstraction we use it in MergeTreeData.
struct ZeroCopyLock
{
ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path);
/// Actual lock
std::unique_ptr<zkutil::ZooKeeperLock> lock;
};
}

View File

@ -9,6 +9,7 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Core/Defines.h>
#include <Interpreters/InterpreterSelectQuery.h>

View File

@ -13,7 +13,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
@ -467,7 +467,7 @@ static void appendBlock(const Block & from, Block & to)
MutableColumnPtr last_col;
try
{
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
if (to.rows() == 0)
{
@ -496,7 +496,7 @@ static void appendBlock(const Block & from, Block & to)
/// In case of rollback, it is better to ignore memory limits instead of abnormal server termination.
/// So ignore any memory limits, even global (since memory tracking has drift).
MemoryTracker::BlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
try
{
@ -924,7 +924,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
}
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = destination_id;

View File

@ -33,6 +33,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/parseQuery.h>
@ -44,6 +45,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
@ -55,6 +57,7 @@
#include <Interpreters/getTableExpressions.h>
#include <Functions/IFunction.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>

View File

@ -11,6 +11,7 @@
#include <QueryPipeline/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Parsers/ASTCreateQuery.h>
namespace DB

View File

@ -17,6 +17,7 @@
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/formatAST.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/AlterCommands.h>

View File

@ -31,17 +31,20 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Databases/DatabaseOnDisk.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/RemoteSource.h>
@ -7128,11 +7131,11 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
}
bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String & table_uuid, const String & part_name,
bool StorageReplicatedMergeTree::unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name,
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old)
{
boost::replace_all(id, "/", "_");
boost::replace_all(part_id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk->getType(), table_uuid, part_name, zookeeper_path_old);
@ -7140,13 +7143,16 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String &
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / id;
String zookeeper_node = fs::path(zookeeper_part_uniq_node) / replica_name_;
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_node);
/// Delete our replica node for part from zookeeper (we are not interested in it anymore)
String zookeeper_part_replica_node = fs::path(zookeeper_part_uniq_node) / replica_name_;
zookeeper_ptr->tryRemove(zookeeper_node);
LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_part_replica_node);
zookeeper_ptr->tryRemove(zookeeper_part_replica_node);
/// Check, maybe we were the last replica and can remove part forever
Strings children;
zookeeper_ptr->tryGetChildren(zookeeper_part_uniq_node, children);
@ -7157,9 +7163,9 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String &
continue;
}
auto e = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node);
auto error_code = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_uniq_node, e != Coordination::Error::ZNOTEMPTY);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_uniq_node, error_code != Coordination::Error::ZNOTEMPTY);
/// Even when we have lock with same part name, but with different uniq, we can remove files on S3
children.clear();
@ -7168,9 +7174,9 @@ bool StorageReplicatedMergeTree::unlockSharedDataByID(String id, const String &
if (children.empty())
{
/// Cleanup after last uniq removing
e = zookeeper_ptr->tryRemove(zookeeper_part_node);
error_code = zookeeper_ptr->tryRemove(zookeeper_part_node);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_node, e != Coordination::Error::ZNOTEMPTY);
LOG_TRACE(logger, "Remove parent zookeeper lock {} : {}", zookeeper_part_node, error_code != Coordination::Error::ZNOTEMPTY);
}
else
{
@ -7213,7 +7219,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return best_replica;
return "";
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), disk_type, getTableSharedID(), part.name,
zookeeper_path);
@ -7251,7 +7257,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
LOG_TRACE(log, "Found zookeper active replicas for part {}: {}", part.name, active_replicas.size());
if (active_replicas.empty())
return best_replica;
return "";
/** You must select the best (most relevant) replica.
* This is a replica with the maximum `log_pointer`, then with the minimum `queue` size.
@ -7305,6 +7311,30 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(const MergeTreeSettings
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const DataPartPtr & part, const DiskPtr & disk)
{
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return std::nullopt;
String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), disk->getType(), getTableSharedID(),
part->name, zookeeper_path)[0];
/// Just recursively create ancestors for lock
zookeeper->createAncestors(zc_zookeeper_path);
zookeeper->createIfNotExists(zc_zookeeper_path, "");
/// Create actual lock
ZeroCopyLock lock(zookeeper, zc_zookeeper_path);
if (lock.lock->tryLock())
return lock;
else
return std::nullopt;
}
String StorageReplicatedMergeTree::findReplicaHavingPart(
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_ptr)
{

View File

@ -243,7 +243,7 @@ public:
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
static bool unlockSharedDataByID(String id, const String & table_uuid, const String & part_name, const String & replica_name_,
static bool unlockSharedDataByID(String part_id, const String & table_uuid, const String & part_name, const String & replica_name_,
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
const String & zookeeper_path_old);
@ -758,6 +758,10 @@ private:
// Create table id if needed
void createTableSharedID();
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If somebody already holding the lock -- return std::nullopt.
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const DataPartPtr & part, const DiskPtr & disk) override;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/

View File

@ -9,6 +9,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/QueryAliasesVisitor.h>
#include <Interpreters/QueryNormalizer.h>
@ -21,7 +23,9 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Sources/BlocksSource.h>

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,26 @@
<clickhouse>
<storage_configuration>
<disks>
<s3_disk>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_disk>
</disks>
<policies>
<s3_and_default>
<volumes>
<main>
<disk>default</disk>
</main>
<external>
<disk>s3_disk</disk>
</external>
</volumes>
</s3_and_default>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True)
node2 = cluster.add_instance("node2", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True)
node3 = cluster.add_instance("node3", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_ttl_move_and_s3(started_cluster):
for i, node in enumerate([node1, node2, node3]):
node.query(
"""
CREATE TABLE s3_test_with_ttl (date DateTime, id UInt32, value String)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/s3_test', '{}')
ORDER BY id
PARTITION BY id
TTL date TO DISK 's3_disk'
SETTINGS storage_policy='s3_and_default'
""".format(i))
node1.query("SYSTEM STOP MOVES s3_test_with_ttl")
node2.query("SYSTEM STOP MOVES s3_test_with_ttl")
for i in range(30):
if i % 2 == 0:
node = node1
else:
node = node2
node.query(f"INSERT INTO s3_test_with_ttl SELECT now() + 5, {i}, randomPrintableASCII(1048570)")
node1.query("SYSTEM SYNC REPLICA s3_test_with_ttl")
node2.query("SYSTEM SYNC REPLICA s3_test_with_ttl")
node3.query("SYSTEM SYNC REPLICA s3_test_with_ttl")
assert node1.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
assert node2.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
node1.query("SYSTEM START MOVES s3_test_with_ttl")
node2.query("SYSTEM START MOVES s3_test_with_ttl")
assert node1.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
assert node2.query("SELECT COUNT() FROM s3_test_with_ttl") == "30\n"
time.sleep(5)
print(node1.query("SELECT * FROM system.parts WHERE table = 's3_test_with_ttl' FORMAT Vertical"))
minio = cluster.minio_client
objects = minio.list_objects(cluster.minio_bucket, 'data/', recursive=True)
counter = 0
for obj in objects:
print("Objectname:", obj.object_name, "metadata:", obj.metadata)
counter += 1
print("Total objects", counter)
assert counter == 300

Some files were not shown because too many files have changed in this diff Show More