Merge branch 'master' into new-nav

This commit is contained in:
Rich Raposa 2023-03-17 11:41:34 -06:00 committed by GitHub
commit 55892bc3ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 706 additions and 309 deletions

View File

@ -162,56 +162,28 @@ Checks: '*,
WarningsAsErrors: '*' WarningsAsErrors: '*'
# TODO: use dictionary syntax for CheckOptions when minimum clang-tidy level rose to 15
# some-check.SomeOption: 'some value'
# instead of
# - key: some-check.SomeOption
# value: 'some value'
CheckOptions: CheckOptions:
- key: readability-identifier-naming.ClassCase readability-identifier-naming.ClassCase: CamelCase
value: CamelCase readability-identifier-naming.EnumCase: CamelCase
- key: readability-identifier-naming.EnumCase readability-identifier-naming.LocalVariableCase: lower_case
value: CamelCase readability-identifier-naming.StaticConstantCase: aNy_CasE
- key: readability-identifier-naming.LocalVariableCase readability-identifier-naming.MemberCase: lower_case
value: lower_case readability-identifier-naming.PrivateMemberPrefix: ''
- key: readability-identifier-naming.StaticConstantCase readability-identifier-naming.ProtectedMemberPrefix: ''
value: aNy_CasE readability-identifier-naming.PublicMemberCase: lower_case
- key: readability-identifier-naming.MemberCase readability-identifier-naming.MethodCase: camelBack
value: lower_case readability-identifier-naming.PrivateMethodPrefix: ''
- key: readability-identifier-naming.PrivateMemberPrefix readability-identifier-naming.ProtectedMethodPrefix: ''
value: '' readability-identifier-naming.ParameterPackCase: lower_case
- key: readability-identifier-naming.ProtectedMemberPrefix readability-identifier-naming.StructCase: CamelCase
value: '' readability-identifier-naming.TemplateTemplateParameterCase: CamelCase
- key: readability-identifier-naming.PublicMemberCase readability-identifier-naming.TemplateUsingCase: lower_case
value: lower_case readability-identifier-naming.TypeTemplateParameterCase: CamelCase
- key: readability-identifier-naming.MethodCase readability-identifier-naming.TypedefCase: CamelCase
value: camelBack readability-identifier-naming.UnionCase: CamelCase
- key: readability-identifier-naming.PrivateMethodPrefix readability-identifier-naming.UsingCase: CamelCase
value: '' modernize-loop-convert.UseCxx20ReverseRanges: false
- key: readability-identifier-naming.ProtectedMethodPrefix performance-move-const-arg.CheckTriviallyCopyableMove: false
value: ''
- key: readability-identifier-naming.ParameterPackCase
value: lower_case
- key: readability-identifier-naming.StructCase
value: CamelCase
- key: readability-identifier-naming.TemplateTemplateParameterCase
value: CamelCase
- key: readability-identifier-naming.TemplateUsingCase
value: lower_case
- key: readability-identifier-naming.TypeTemplateParameterCase
value: CamelCase
- key: readability-identifier-naming.TypedefCase
value: CamelCase
- key: readability-identifier-naming.UnionCase
value: CamelCase
- key: readability-identifier-naming.UsingCase
value: CamelCase
- key: modernize-loop-convert.UseCxx20ReverseRanges
value: false
- key: performance-move-const-arg.CheckTriviallyCopyableMove
value: false
# Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097 # Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097
- key: readability-identifier-naming.TypeTemplateParameterIgnoredRegexp readability-identifier-naming.TypeTemplateParameterIgnoredRegexp: expr-type
value: expr-type cppcoreguidelines-avoid-do-while.IgnoreMacros: true
- key: cppcoreguidelines-avoid-do-while.IgnoreMacros
value: true

View File

@ -184,26 +184,12 @@ if (OS_DARWIN)
set (ENABLE_CURL_BUILD OFF) set (ENABLE_CURL_BUILD OFF)
endif () endif ()
# Ignored if `lld` is used
option(ADD_GDB_INDEX_FOR_GOLD "Add .gdb-index to resulting binaries for gold linker.")
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE") if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# Can be lld or ld-lld or lld-13 or /path/to/lld. # Can be lld or ld-lld or lld-13 or /path/to/lld.
if (LINKER_NAME MATCHES "lld" AND OS_LINUX) if (LINKER_NAME MATCHES "lld" AND OS_LINUX)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gdb-index") set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gdb-index")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--gdb-index") set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--gdb-index")
message (STATUS "Adding .gdb-index via --gdb-index linker option.") message (STATUS "Adding .gdb-index via --gdb-index linker option.")
# we use another tool for gdb-index, because gold linker removes section .debug_aranges, which used inside clickhouse stacktraces
# http://sourceware-org.1504.n7.nabble.com/gold-No-debug-aranges-section-when-linking-with-gdb-index-td540965.html#a556932
elseif (LINKER_NAME MATCHES "gold$" AND ADD_GDB_INDEX_FOR_GOLD)
find_program (GDB_ADD_INDEX_EXE NAMES "gdb-add-index" DOC "Path to gdb-add-index executable")
if (NOT GDB_ADD_INDEX_EXE)
set (USE_GDB_ADD_INDEX 0)
message (WARNING "Cannot add gdb index to binaries, because gold linker is used, but gdb-add-index executable not found.")
else()
set (USE_GDB_ADD_INDEX 1)
message (STATUS "gdb-add-index found: ${GDB_ADD_INDEX_EXE}")
endif()
endif () endif ()
endif() endif()
@ -302,11 +288,11 @@ if (ENABLE_BUILD_PROFILING)
endif () endif ()
set (CMAKE_CXX_STANDARD 23) set (CMAKE_CXX_STANDARD 23)
set (CMAKE_CXX_EXTENSIONS ON) # Same as gnu++2a (ON) vs c++2a (OFF): https://cmake.org/cmake/help/latest/prop_tgt/CXX_EXTENSIONS.html set (CMAKE_CXX_EXTENSIONS OFF)
set (CMAKE_CXX_STANDARD_REQUIRED ON) set (CMAKE_CXX_STANDARD_REQUIRED ON)
set (CMAKE_C_STANDARD 11) set (CMAKE_C_STANDARD 11)
set (CMAKE_C_EXTENSIONS ON) set (CMAKE_C_EXTENSIONS ON) # required by most contribs written in C
set (CMAKE_C_STANDARD_REQUIRED ON) set (CMAKE_C_STANDARD_REQUIRED ON)
if (COMPILER_GCC OR COMPILER_CLANG) if (COMPILER_GCC OR COMPILER_CLANG)

View File

@ -50,15 +50,18 @@ endif ()
string (REGEX MATCHALL "[0-9]+" COMPILER_VERSION_LIST ${CMAKE_CXX_COMPILER_VERSION}) string (REGEX MATCHALL "[0-9]+" COMPILER_VERSION_LIST ${CMAKE_CXX_COMPILER_VERSION})
list (GET COMPILER_VERSION_LIST 0 COMPILER_VERSION_MAJOR) list (GET COMPILER_VERSION_LIST 0 COMPILER_VERSION_MAJOR)
# Example values: `lld-10`, `gold`. # Example values: `lld-10`
option (LINKER_NAME "Linker name or full path") option (LINKER_NAME "Linker name or full path")
if (LINKER_NAME MATCHES "gold")
message (FATAL_ERROR "Linking with gold is unsupported. Please use lld.")
endif ()
# s390x doesnt support lld # s390x doesnt support lld
if (NOT ARCH_S390X) if (NOT ARCH_S390X)
if (NOT LINKER_NAME) if (NOT LINKER_NAME)
if (COMPILER_GCC) if (COMPILER_GCC)
find_program (LLD_PATH NAMES "ld.lld") find_program (LLD_PATH NAMES "ld.lld")
find_program (GOLD_PATH NAMES "ld.gold")
elseif (COMPILER_CLANG) elseif (COMPILER_CLANG)
# llvm lld is a generic driver. # llvm lld is a generic driver.
# Invoke ld.lld (Unix), ld64.lld (macOS), lld-link (Windows), wasm-ld (WebAssembly) instead # Invoke ld.lld (Unix), ld64.lld (macOS), lld-link (Windows), wasm-ld (WebAssembly) instead
@ -67,13 +70,11 @@ if (NOT ARCH_S390X)
elseif (OS_DARWIN) elseif (OS_DARWIN)
find_program (LLD_PATH NAMES "ld64.lld-${COMPILER_VERSION_MAJOR}" "ld64.lld") find_program (LLD_PATH NAMES "ld64.lld-${COMPILER_VERSION_MAJOR}" "ld64.lld")
endif () endif ()
find_program (GOLD_PATH NAMES "ld.gold" "gold")
endif () endif ()
endif() endif()
endif() endif()
if ((OS_LINUX OR OS_DARWIN) AND NOT LINKER_NAME) if ((OS_LINUX OR OS_DARWIN) AND NOT LINKER_NAME)
# prefer lld linker over gold or ld on linux and macos
if (LLD_PATH) if (LLD_PATH)
if (COMPILER_GCC) if (COMPILER_GCC)
# GCC driver requires one of supported linker names like "lld". # GCC driver requires one of supported linker names like "lld".
@ -83,17 +84,6 @@ if ((OS_LINUX OR OS_DARWIN) AND NOT LINKER_NAME)
set (LINKER_NAME ${LLD_PATH}) set (LINKER_NAME ${LLD_PATH})
endif () endif ()
endif () endif ()
if (NOT LINKER_NAME)
if (GOLD_PATH)
message (FATAL_ERROR "Linking with gold is unsupported. Please use lld.")
if (COMPILER_GCC)
set (LINKER_NAME "gold")
else ()
set (LINKER_NAME ${GOLD_PATH})
endif ()
endif ()
endif ()
endif () endif ()
# TODO: allow different linker on != OS_LINUX # TODO: allow different linker on != OS_LINUX

View File

@ -22,15 +22,15 @@ tuple(x, y, …)
## tupleElement ## tupleElement
A function that allows getting a column from a tuple. A function that allows getting a column from a tuple.
N is the column index, starting from 1. N must be a constant. N must be a strict postive integer no greater than the size of the tuple.
There is no cost to execute the function.
The function implements the operator `x.N`. If the second argument is a number `n`, it is the column index, starting from 1. If the second argument is a string `s`, it represents the name of the element. Besides, we can provide the third optional argument, such that when index out of bounds or element for such name does not exist, the default value returned instead of throw exception. The second and third arguments if provided are always must be constant. There is no cost to execute the function.
The function implements the operator `x.n` and `x.s`.
**Syntax** **Syntax**
``` sql ``` sql
tupleElement(tuple, n) tupleElement(tuple, n/s [, default_value])
``` ```
## untuple ## untuple

View File

@ -18,7 +18,7 @@ Group=clickhouse
Restart=always Restart=always
RestartSec=30 RestartSec=30
# Since ClickHouse is systemd aware default 1m30sec may not be enough # Since ClickHouse is systemd aware default 1m30sec may not be enough
TimeoutStartSec=infinity TimeoutStartSec=0
# %p is resolved to the systemd unit name # %p is resolved to the systemd unit name
RuntimeDirectory=%p RuntimeDirectory=%p
ExecStart=/usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml --pid-file=%t/%p/%p.pid ExecStart=/usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml --pid-file=%t/%p/%p.pid

View File

@ -400,10 +400,6 @@ endif ()
add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_BUNDLE}) add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_BUNDLE})
if (USE_GDB_ADD_INDEX)
add_custom_command(TARGET clickhouse POST_BUILD COMMAND ${GDB_ADD_INDEX_EXE} clickhouse COMMENT "Adding .gdb-index to clickhouse" VERBATIM)
endif()
if (USE_BINARY_HASH) if (USE_BINARY_HASH)
add_custom_command(TARGET clickhouse POST_BUILD COMMAND ./clickhouse hash-binary > hash && ${OBJCOPY_PATH} --add-section .clickhouse.hash=hash clickhouse COMMENT "Adding section '.clickhouse.hash' to clickhouse binary" VERBATIM) add_custom_command(TARGET clickhouse POST_BUILD COMMAND ./clickhouse hash-binary > hash && ${OBJCOPY_PATH} --add-section .clickhouse.hash=hash clickhouse COMMENT "Adding section '.clickhouse.hash' to clickhouse binary" VERBATIM)
endif() endif()

View File

@ -35,10 +35,6 @@ target_link_libraries(clickhouse-odbc-bridge PRIVATE
set_target_properties(clickhouse-odbc-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) set_target_properties(clickhouse-odbc-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
target_compile_options (clickhouse-odbc-bridge PRIVATE -Wno-reserved-id-macro -Wno-keyword-macro) target_compile_options (clickhouse-odbc-bridge PRIVATE -Wno-reserved-id-macro -Wno-keyword-macro)
if (USE_GDB_ADD_INDEX)
add_custom_command(TARGET clickhouse-odbc-bridge POST_BUILD COMMAND ${GDB_ADD_INDEX_EXE} ../clickhouse-odbc-bridge COMMENT "Adding .gdb-index to clickhouse-odbc-bridge" VERBATIM)
endif()
if (SPLIT_DEBUG_SYMBOLS) if (SPLIT_DEBUG_SYMBOLS)
clickhouse_split_debug_symbols(TARGET clickhouse-odbc-bridge DESTINATION_DIR ${CMAKE_CURRENT_BINARY_DIR}/../${SPLITTED_DEBUG_SYMBOLS_DIR} BINARY_PATH ../clickhouse-odbc-bridge) clickhouse_split_debug_symbols(TARGET clickhouse-odbc-bridge DESTINATION_DIR ${CMAKE_CURRENT_BINARY_DIR}/../${SPLITTED_DEBUG_SYMBOLS_DIR} BINARY_PATH ../clickhouse-odbc-bridge)
else() else()

View File

@ -11,13 +11,21 @@
M(ReplicatedSend, "Number of data parts being sent to replicas") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \
M(ReplicatedChecks, "Number of data parts checking for consistency") \ M(ReplicatedChecks, "Number of data parts checking for consistency") \
M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \ M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \
M(BackgroundMergesAndMutationsPoolSize, "Limit on number of active merges and mutations in an associated background pool") \
M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \ M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \
M(BackgroundFetchesPoolSize, "Limit on number of simultaneous fetches in an associated background pool") \
M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \ M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \
M(BackgroundCommonPoolSize, "Limit on number of tasks in an associated background pool") \
M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \ M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \
M(BackgroundMovePoolSize, "Limit on number of tasks in BackgroundProcessingPool for moves") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundSchedulePoolSize, "Limit on number of tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundBufferFlushSchedulePoolSize, "Limit on number of tasks in BackgroundBufferFlushSchedulePool") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \ M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(BackgroundDistributedSchedulePoolSize, "Limit on number of tasks in BackgroundDistributedSchedulePool") \
M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \ M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \
M(BackgroundMessageBrokerSchedulePoolSize, "Limit on number of tasks in BackgroundProcessingPool for message streaming") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \ M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \

View File

@ -41,6 +41,16 @@ ZooKeeperLock::~ZooKeeperLock()
} }
} }
bool ZooKeeperLock::isLocked() const
{
return locked;
}
const std::string & ZooKeeperLock::getLockPath() const
{
return lock_path;
}
void ZooKeeperLock::unlock() void ZooKeeperLock::unlock()
{ {
if (!locked) if (!locked)

View File

@ -37,6 +37,8 @@ public:
void unlock(); void unlock();
bool tryLock(); bool tryLock();
bool isLocked() const;
const std::string & getLockPath() const;
private: private:
zkutil::ZooKeeperPtr zookeeper; zkutil::ZooKeeperPtr zookeeper;

View File

@ -149,8 +149,9 @@ Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
} }
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_) BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_)
: tasks_metric(tasks_metric_) : tasks_metric(tasks_metric_)
, size_metric(size_metric_, size_)
, thread_name(thread_name_) , thread_name(thread_name_)
{ {
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_); LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_);
@ -177,6 +178,8 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count); threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i) for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); }); threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
size_metric.changeTo(new_threads_count);
} }

View File

@ -54,7 +54,7 @@ public:
void increaseThreadsCount(size_t new_threads_count); void increaseThreadsCount(size_t new_threads_count);
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction()) /// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_); BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric size_metric_, const char *thread_name_);
~BackgroundSchedulePool(); ~BackgroundSchedulePool();
private: private:
@ -91,6 +91,7 @@ private:
DelayedTasks delayed_tasks; DelayedTasks delayed_tasks;
CurrentMetrics::Metric tasks_metric; CurrentMetrics::Metric tasks_metric;
CurrentMetrics::Increment size_metric;
std::string thread_name; std::string thread_name;
}; };

View File

@ -83,4 +83,24 @@ DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name,
return conf; return conf;
} }
ASTs convertDiskConfigurationToAST(const Poco::Util::AbstractConfiguration & configuration, const std::string & config_path)
{
ASTs result;
Poco::Util::AbstractConfiguration::Keys keys;
configuration.keys(config_path, keys);
for (const auto & key : keys)
{
result.push_back(
makeASTFunction(
"equals",
std::make_shared<ASTIdentifier>(key),
std::make_shared<ASTLiteral>(configuration.getString(config_path + "." + key))));
}
return result;
}
} }

View File

@ -25,4 +25,12 @@ using DiskConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
*/ */
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context); DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
/// The same as above function, but return XML::Document for easier modification of result configuration.
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
/*
* A reverse function.
*/
[[ maybe_unused ]] ASTs convertDiskConfigurationToAST(const Poco::Util::AbstractConfiguration & configuration, const std::string & config_path);
} }

View File

@ -803,7 +803,7 @@ struct ConvertImpl<DataTypeEnum<FieldType>, DataTypeNumber<FieldType>, Name, Con
} }
}; };
static ColumnUInt8::MutablePtr copyNullMap(ColumnPtr col) static inline ColumnUInt8::MutablePtr copyNullMap(ColumnPtr col)
{ {
ColumnUInt8::MutablePtr null_map = nullptr; ColumnUInt8::MutablePtr null_map = nullptr;
if (const auto * col_null = checkAndGetColumn<ColumnNullable>(col.get())) if (const auto * col_null = checkAndGetColumn<ColumnNullable>(col.get()))

View File

@ -129,13 +129,21 @@ namespace CurrentMetrics
{ {
extern const Metric ContextLockWait; extern const Metric ContextLockWait;
extern const Metric BackgroundMovePoolTask; extern const Metric BackgroundMovePoolTask;
extern const Metric BackgroundMovePoolSize;
extern const Metric BackgroundSchedulePoolTask; extern const Metric BackgroundSchedulePoolTask;
extern const Metric BackgroundSchedulePoolSize;
extern const Metric BackgroundBufferFlushSchedulePoolTask; extern const Metric BackgroundBufferFlushSchedulePoolTask;
extern const Metric BackgroundBufferFlushSchedulePoolSize;
extern const Metric BackgroundDistributedSchedulePoolTask; extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric BackgroundDistributedSchedulePoolSize;
extern const Metric BackgroundMessageBrokerSchedulePoolTask; extern const Metric BackgroundMessageBrokerSchedulePoolTask;
extern const Metric BackgroundMessageBrokerSchedulePoolSize;
extern const Metric BackgroundMergesAndMutationsPoolTask; extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundMergesAndMutationsPoolSize;
extern const Metric BackgroundFetchesPoolTask; extern const Metric BackgroundFetchesPoolTask;
extern const Metric BackgroundFetchesPoolSize;
extern const Metric BackgroundCommonPoolTask; extern const Metric BackgroundCommonPoolTask;
extern const Metric BackgroundCommonPoolSize;
} }
namespace DB namespace DB
@ -2175,6 +2183,7 @@ BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>( shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_buffer_flush_schedule_pool_size, background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask, CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
CurrentMetrics::BackgroundBufferFlushSchedulePoolSize,
"BgBufSchPool"); "BgBufSchPool");
} }
@ -2226,6 +2235,7 @@ BackgroundSchedulePool & Context::getSchedulePool() const
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>( shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_schedule_pool_size, background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask, CurrentMetrics::BackgroundSchedulePoolTask,
CurrentMetrics::BackgroundSchedulePoolSize,
"BgSchPool"); "BgSchPool");
} }
@ -2246,6 +2256,7 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>( shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_distributed_schedule_pool_size, background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask, CurrentMetrics::BackgroundDistributedSchedulePoolTask,
CurrentMetrics::BackgroundDistributedSchedulePoolSize,
"BgDistSchPool"); "BgDistSchPool");
} }
@ -2266,6 +2277,7 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>( shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
background_message_broker_schedule_pool_size, background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask, CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolSize,
"BgMBSchPool"); "BgMBSchPool");
} }
@ -3826,6 +3838,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
/*max_threads_count*/background_pool_size, /*max_threads_count*/background_pool_size,
/*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio, /*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask, CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize,
background_merges_mutations_scheduling_policy background_merges_mutations_scheduling_policy
); );
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}, scheduling_policy={}", LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}, scheduling_policy={}",
@ -3836,7 +3849,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Move", "Move",
background_move_pool_size, background_move_pool_size,
background_move_pool_size, background_move_pool_size,
CurrentMetrics::BackgroundMovePoolTask CurrentMetrics::BackgroundMovePoolTask,
CurrentMetrics::BackgroundMovePoolSize
); );
LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", background_move_pool_size, background_move_pool_size); LOG_INFO(shared->log, "Initialized background executor for move operations with num_threads={}, num_tasks={}", background_move_pool_size, background_move_pool_size);
@ -3845,7 +3859,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Fetch", "Fetch",
background_fetches_pool_size, background_fetches_pool_size,
background_fetches_pool_size, background_fetches_pool_size,
CurrentMetrics::BackgroundFetchesPoolTask CurrentMetrics::BackgroundFetchesPoolTask,
CurrentMetrics::BackgroundFetchesPoolSize
); );
LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", background_fetches_pool_size, background_fetches_pool_size); LOG_INFO(shared->log, "Initialized background executor for fetches with num_threads={}, num_tasks={}", background_fetches_pool_size, background_fetches_pool_size);
@ -3854,7 +3869,8 @@ void Context::initializeBackgroundExecutorsIfNeeded()
"Common", "Common",
background_common_pool_size, background_common_pool_size,
background_common_pool_size, background_common_pool_size,
CurrentMetrics::BackgroundCommonPoolTask CurrentMetrics::BackgroundCommonPoolTask,
CurrentMetrics::BackgroundCommonPoolSize
); );
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", background_common_pool_size, background_common_pool_size); LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}", background_common_pool_size, background_common_pool_size);

View File

@ -147,7 +147,6 @@ StoragePtr TemporaryTableHolder::getTable() const
return table; return table;
} }
void DatabaseCatalog::initializeAndLoadTemporaryDatabase() void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
{ {
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec); drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);

View File

@ -235,6 +235,21 @@ public:
void checkTableCanBeRemovedOrRenamed(const StorageID & table_id, bool check_referential_dependencies, bool check_loading_dependencies, bool is_drop_database = false) const; void checkTableCanBeRemovedOrRenamed(const StorageID & table_id, bool check_referential_dependencies, bool check_loading_dependencies, bool is_drop_database = false) const;
struct TableMarkedAsDropped
{
StorageID table_id = StorageID::createEmpty();
StoragePtr table;
String metadata_path;
time_t drop_time{};
};
using TablesMarkedAsDropped = std::list<TableMarkedAsDropped>;
TablesMarkedAsDropped getTablesMarkedDropped()
{
std::lock_guard lock(tables_marked_dropped_mutex);
return tables_marked_dropped;
}
private: private:
// The global instance of database catalog. unique_ptr is to allow // The global instance of database catalog. unique_ptr is to allow
// deferred initialization. Thought I'd use std::optional, but I can't // deferred initialization. Thought I'd use std::optional, but I can't
@ -263,15 +278,6 @@ private:
return uuid.toUnderType().items[0] >> (64 - bits_for_first_level); return uuid.toUnderType().items[0] >> (64 - bits_for_first_level);
} }
struct TableMarkedAsDropped
{
StorageID table_id = StorageID::createEmpty();
StoragePtr table;
String metadata_path;
time_t drop_time{};
};
using TablesMarkedAsDropped = std::list<TableMarkedAsDropped>;
void dropTableDataTask(); void dropTableDataTask();
void dropTableFinally(const TableMarkedAsDropped & table); void dropTableFinally(const TableMarkedAsDropped & table);

View File

@ -107,39 +107,4 @@ void FillingRow::initFromDefaults(size_t from_pos)
row[i] = getFillDescription(i).fill_from; row[i] = getFillDescription(i).fill_from;
} }
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
{
filling_columns[i]->insertDefault();
}
else
{
filling_columns[i]->insert(filling_row[i]);
}
}
if (size_t size = interpolate_block.columns())
{
Columns columns = interpolate_block.getColumns();
for (size_t i = 0; i < size; ++i)
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
for (const auto & interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
for (const auto & other_column : other_columns)
other_column->insertDefault();
}
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
} }

View File

@ -39,8 +39,4 @@ private:
SortDescription sort_description; SortDescription sort_description;
}; };
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns,
const FillingRow & filling_row, const Block & interpolate_block);
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num);
} }

View File

@ -2151,8 +2151,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_set("SET"); ParserKeyword s_set("SET");
ParserKeyword s_recompress("RECOMPRESS"); ParserKeyword s_recompress("RECOMPRESS");
ParserKeyword s_codec("CODEC"); ParserKeyword s_codec("CODEC");
ParserToken s_comma(TokenType::Comma); ParserKeyword s_materialize("MATERIALIZE");
ParserToken s_eq(TokenType::Equals); ParserKeyword s_remove("REMOVE");
ParserKeyword s_modify("MODIFY");
ParserIdentifier parser_identifier; ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal; ParserStringLiteral parser_string_literal;
@ -2160,8 +2161,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserExpressionList parser_keys_list(false); ParserExpressionList parser_keys_list(false);
ParserCodec parser_codec; ParserCodec parser_codec;
ParserList parser_assignment_list( if (s_materialize.checkWithoutMoving(pos, expected) ||
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma)); s_remove.checkWithoutMoving(pos, expected) ||
s_modify.checkWithoutMoving(pos, expected))
return false;
ASTPtr ttl_expr; ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected)) if (!parser_exp.parse(pos, ttl_expr, expected))
@ -2219,6 +2223,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (s_set.ignore(pos)) if (s_set.ignore(pos))
{ {
ParserList parser_assignment_list(
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma));
if (!parser_assignment_list.parse(pos, group_by_assignments, expected)) if (!parser_assignment_list.parse(pos, group_by_assignments, expected))
return false; return false;
} }

View File

@ -999,9 +999,6 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t
"the value setting 'min_chunk_bytes_for_parallel_parsing' or check your data manually, most likely BSON is malformed", "the value setting 'min_chunk_bytes_for_parallel_parsing' or check your data manually, most likely BSON is malformed",
min_bytes, document_size); min_bytes, document_size);
if (document_size < sizeof(document_size))
throw ParsingException(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid");
size_t old_size = memory.size(); size_t old_size = memory.size();
memory.resize(old_size + document_size); memory.resize(old_size + document_size);
memcpy(memory.data() + old_size, reinterpret_cast<char *>(&document_size), sizeof(document_size)); memcpy(memory.data() + old_size, reinterpret_cast<char *>(&document_size), sizeof(document_size));

View File

@ -259,37 +259,7 @@ IProcessor::Status FillingTransform::prepare()
return ISimpleTransform::prepare(); return ISimpleTransform::prepare();
} }
void FillingTransform::interpolate(const MutableColumns & result_columns, Block & interpolate_block)
void FillingTransform::transform(Chunk & chunk)
{
if (!chunk.hasRows() && !generate_suffix)
return;
Columns old_fill_columns;
Columns old_interpolate_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_interpolate_columns;
MutableColumns res_other_columns;
std::vector<std::pair<MutableColumns *, size_t>> res_map;
res_map.resize(input.getHeader().columns());
auto init_columns_by_positions = [&res_map](const Columns & old_columns, Columns & new_columns,
MutableColumns & new_mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
auto old_column = old_columns[pos]->convertToFullColumnIfConst();
new_columns.push_back(old_column);
res_map[pos] = {&new_mutable_columns, new_mutable_columns.size()};
new_mutable_columns.push_back(old_column->cloneEmpty()->assumeMutable());
}
};
Block interpolate_block;
auto interpolate = [&]()
{ {
if (interpolate_description) if (interpolate_description)
{ {
@ -301,8 +271,8 @@ void FillingTransform::transform(Chunk & chunk)
for (const auto & [col_pos, name_type] : input_positions) for (const auto & [col_pos, name_type] : input_positions)
{ {
MutableColumnPtr column = name_type.type->createColumn(); MutableColumnPtr column = name_type.type->createColumn();
auto [res_columns, pos] = res_map[col_pos]; const auto * res_column = result_columns[col_pos].get();
size_t size = (*res_columns)[pos]->size(); size_t size = res_column->size();
if (size == 0) /// this is the first row in current chunk if (size == 0) /// this is the first row in current chunk
{ {
/// take value from last row of previous chunk if exists, else use default /// take value from last row of previous chunk if exists, else use default
@ -312,7 +282,7 @@ void FillingTransform::transform(Chunk & chunk)
column->insertDefault(); column->insertDefault();
} }
else /// take value from previous row of current chunk else /// take value from previous row of current chunk
column->insertFrom(*(*res_columns)[pos], size - 1); column->insertFrom(*res_column, size - 1);
interpolate_block.insert({std::move(column), name_type.type, name_type.name}); interpolate_block.insert({std::move(column), name_type.type, name_type.name});
} }
@ -324,41 +294,140 @@ void FillingTransform::transform(Chunk & chunk)
interpolate_actions->execute(interpolate_block, n); interpolate_actions->execute(interpolate_block, n);
} }
} }
}; }
using MutableColumnRawPtrs = std::vector<IColumn*>;
static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, const MutableColumnRawPtrs & interpolate_columns, const MutableColumnRawPtrs & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
filling_columns[i]->insertDefault();
else
filling_columns[i]->insert(filling_row[i]);
}
if (size_t size = interpolate_block.columns())
{
Columns columns = interpolate_block.getColumns();
for (size_t i = 0; i < size; ++i)
interpolate_columns[i]->insertFrom(*columns[i]->convertToFullColumnIfConst(), 0);
}
else
for (auto * interpolate_column : interpolate_columns)
interpolate_column->insertDefault();
for (auto * other_column : other_columns)
other_column->insertDefault();
}
static void copyRowFromColumns(const MutableColumnRawPtrs & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0, size = source.size(); i < size; ++i)
dest[i]->insertFrom(*source[i], row_num);
}
static void initColumnsByPositions(
const Columns & input_columns,
Columns & input_columns_by_positions,
const MutableColumns & output_columns,
MutableColumnRawPtrs & output_columns_by_position,
const std::vector<size_t> & positions)
{
for (size_t pos : positions)
{
input_columns_by_positions.push_back(input_columns[pos]);
output_columns_by_position.push_back(output_columns[pos].get());
}
}
void FillingTransform::initColumns(
const Columns & input_columns,
Columns & input_fill_columns,
Columns & input_interpolate_columns,
Columns & input_other_columns,
MutableColumns & output_columns,
MutableColumnRawPtrs & output_fill_columns,
MutableColumnRawPtrs & output_interpolate_columns,
MutableColumnRawPtrs & output_other_columns)
{
Columns non_const_columns;
non_const_columns.reserve(input_columns.size());
for (const auto & column : input_columns)
non_const_columns.push_back(column->convertToFullColumnIfConst());
for (const auto & column : non_const_columns)
output_columns.push_back(column->cloneEmpty()->assumeMutable());
initColumnsByPositions(non_const_columns, input_fill_columns, output_columns, output_fill_columns, fill_column_positions);
initColumnsByPositions(
non_const_columns, input_interpolate_columns, output_columns, output_interpolate_columns, interpolate_column_positions);
initColumnsByPositions(non_const_columns, input_other_columns, output_columns, output_other_columns, other_column_positions);
}
void FillingTransform::transform(Chunk & chunk)
{
if (!chunk.hasRows() && !generate_suffix)
return;
Columns old_fill_columns;
Columns old_interpolate_columns;
Columns old_other_columns;
MutableColumnRawPtrs res_fill_columns;
MutableColumnRawPtrs res_interpolate_columns;
MutableColumnRawPtrs res_other_columns;
MutableColumns result_columns;
Block interpolate_block;
if (generate_suffix) if (generate_suffix)
{ {
const auto & empty_columns = input.getHeader().getColumns(); const auto & empty_columns = input.getHeader().getColumns();
init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions); initColumns(
init_columns_by_positions(empty_columns, old_interpolate_columns, res_interpolate_columns, interpolate_column_positions); empty_columns,
init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions); old_fill_columns,
old_interpolate_columns,
old_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_other_columns);
if (first) if (first)
filling_row.initFromDefaults(); filling_row.initFromDefaults();
if (should_insert_first && filling_row < next_row) if (should_insert_first && filling_row < next_row)
{ {
interpolate(); interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
} }
interpolate(); interpolate(result_columns, interpolate_block);
while (filling_row.next(next_row)) while (filling_row.next(next_row))
{ {
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
interpolate(); interpolate(result_columns, interpolate_block);
} }
setResultColumns(chunk, res_fill_columns, res_interpolate_columns, res_other_columns); size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
return; return;
} }
size_t num_rows = chunk.getNumRows(); const size_t num_rows = chunk.getNumRows();
auto old_columns = chunk.detachColumns(); auto old_columns = chunk.detachColumns();
initColumns(
init_columns_by_positions(old_columns, old_fill_columns, res_fill_columns, fill_column_positions); old_columns,
init_columns_by_positions(old_columns, old_interpolate_columns, res_interpolate_columns, interpolate_column_positions); old_fill_columns,
init_columns_by_positions(old_columns, old_other_columns, res_other_columns, other_column_positions); old_interpolate_columns,
old_other_columns,
result_columns,
res_fill_columns,
res_interpolate_columns,
res_other_columns);
if (first) if (first)
{ {
@ -372,7 +441,7 @@ void FillingTransform::transform(Chunk & chunk)
filling_row.initFromDefaults(i); filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i))) if (less(fill_from, current_value, filling_row.getDirection(i)))
{ {
interpolate(); interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
} }
break; break;
@ -386,7 +455,7 @@ void FillingTransform::transform(Chunk & chunk)
{ {
should_insert_first = next_row < filling_row; should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i) for (size_t i = 0, size = filling_row.size(); i < size; ++i)
{ {
auto current_value = (*old_fill_columns[i])[row_ind]; auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to; const auto & fill_to = filling_row.getFillDescription(i).fill_to;
@ -401,15 +470,15 @@ void FillingTransform::transform(Chunk & chunk)
/// and probably we need to insert it to block. /// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row) if (should_insert_first && filling_row < next_row)
{ {
interpolate(); interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
} }
interpolate(); interpolate(result_columns, interpolate_block);
while (filling_row.next(next_row)) while (filling_row.next(next_row))
{ {
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
interpolate(); interpolate(result_columns, interpolate_block);
} }
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind); copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
@ -417,55 +486,24 @@ void FillingTransform::transform(Chunk & chunk)
copyRowFromColumns(res_other_columns, old_other_columns, row_ind); copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
} }
saveLastRow(res_fill_columns, res_interpolate_columns, res_other_columns); saveLastRow(result_columns);
setResultColumns(chunk, res_fill_columns, res_interpolate_columns, res_other_columns); size_t num_output_rows = result_columns[0]->size();
chunk.setColumns(std::move(result_columns), num_output_rows);
} }
void FillingTransform::setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns) const void FillingTransform::saveLastRow(const MutableColumns & result_columns)
{
MutableColumns result_columns(fill_columns.size() + interpolate_columns.size() + other_columns.size());
/// fill_columns always non-empty.
size_t num_rows = fill_columns[0]->size();
for (size_t i = 0, size = fill_columns.size(); i < size; ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0, size = interpolate_columns.size(); i < size; ++i)
result_columns[interpolate_column_positions[i]] = std::move(interpolate_columns[i]);
for (size_t i = 0, size = other_columns.size(); i < size; ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
chunk.setColumns(std::move(result_columns), num_rows);
}
void FillingTransform::saveLastRow(const MutableColumns & fill_columns, const MutableColumns & interpolate_columns, const MutableColumns & other_columns)
{ {
last_row.clear(); last_row.clear();
last_row.resize(fill_columns.size() + interpolate_columns.size() + other_columns.size());
size_t num_rows = fill_columns[0]->size(); const size_t num_rows = result_columns[0]->size();
if (num_rows == 0) if (num_rows == 0)
return; return;
for (size_t i = 0, size = fill_columns.size(); i < size; ++i) for (const auto & result_column : result_columns)
{ {
auto column = fill_columns[i]->cloneEmpty(); auto column = result_column->cloneEmpty();
column->insertFrom(*fill_columns[i], num_rows - 1); column->insertFrom(*result_column, num_rows - 1);
last_row[fill_column_positions[i]] = std::move(column); last_row.push_back(std::move(column));
}
for (size_t i = 0, size = interpolate_columns.size(); i < size; ++i)
{
auto column = interpolate_columns[i]->cloneEmpty();
column->insertFrom(*interpolate_columns[i], num_rows - 1);
last_row[interpolate_column_positions[i]] = std::move(column);
}
for (size_t i = 0, size = other_columns.size(); i < size; ++i)
{
auto column = other_columns[i]->cloneEmpty();
column->insertFrom(*other_columns[i], num_rows - 1);
last_row[other_column_positions[i]] = std::move(column);
} }
} }
} }

View File

@ -28,8 +28,19 @@ protected:
void transform(Chunk & Chunk) override; void transform(Chunk & Chunk) override;
private: private:
void setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & interpolate_columns, MutableColumns & other_columns) const; void saveLastRow(const MutableColumns & result_columns);
void saveLastRow(const MutableColumns & fill_columns, const MutableColumns & interpolate_columns, const MutableColumns & other_columns); void interpolate(const MutableColumns& result_columns, Block & interpolate_block);
using MutableColumnRawPtrs = std::vector<IColumn *>;
void initColumns(
const Columns & input_columns,
Columns & input_fill_columns,
Columns & input_interpolate_columns,
Columns & input_other_columns,
MutableColumns & output_columns,
MutableColumnRawPtrs & output_fill_columns,
MutableColumnRawPtrs & output_interpolate_columns,
MutableColumnRawPtrs & output_other_columns);
const SortDescription sort_description; /// Contains only columns with WITH FILL. const SortDescription sort_description; /// Contains only columns with WITH FILL.
const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns const InterpolateDescriptionPtr interpolate_description; /// Contains INTERPOLATE columns

View File

@ -218,9 +218,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk); zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
if (!zero_copy_lock) if (!zero_copy_lock || !zero_copy_lock->isLocked())
{ {
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name); LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
storage.watchZeroCopyLock(entry.new_part_name, disk);
/// Don't check for missing part -- it's missing because other replica still not /// Don't check for missing part -- it's missing because other replica still not
/// finished merge. /// finished merge.
return PrepareResult{ return PrepareResult{

View File

@ -59,6 +59,7 @@ void MergeTreeBackgroundExecutor<Queue>::increaseThreadsAndMaxTasksCount(size_t
for (size_t number = threads_count; number < new_threads_count; ++number) for (size_t number = threads_count; number < new_threads_count; ++number)
pool.scheduleOrThrowOnError([this] { threadFunction(); }); pool.scheduleOrThrowOnError([this] { threadFunction(); });
max_tasks_metric.changeTo(2 * new_max_tasks_count); // pending + active
max_tasks_count.store(new_max_tasks_count, std::memory_order_relaxed); max_tasks_count.store(new_max_tasks_count, std::memory_order_relaxed);
threads_count = new_threads_count; threads_count = new_threads_count;
} }

View File

@ -13,6 +13,7 @@
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
@ -247,11 +248,13 @@ public:
String name_, String name_,
size_t threads_count_, size_t threads_count_,
size_t max_tasks_count_, size_t max_tasks_count_,
CurrentMetrics::Metric metric_) CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_)
: name(name_) : name(name_)
, threads_count(threads_count_) , threads_count(threads_count_)
, max_tasks_count(max_tasks_count_) , max_tasks_count(max_tasks_count_)
, metric(metric_) , metric(metric_)
, max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending
{ {
if (max_tasks_count == 0) if (max_tasks_count == 0)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero"); throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
@ -272,9 +275,10 @@ public:
size_t threads_count_, size_t threads_count_,
size_t max_tasks_count_, size_t max_tasks_count_,
CurrentMetrics::Metric metric_, CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_,
std::string_view policy) std::string_view policy)
requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_) : MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_, max_tasks_metric_)
{ {
pending.updatePolicy(policy); pending.updatePolicy(policy);
} }
@ -311,6 +315,7 @@ private:
size_t threads_count TSA_GUARDED_BY(mutex) = 0; size_t threads_count TSA_GUARDED_BY(mutex) = 0;
std::atomic<size_t> max_tasks_count = 0; std::atomic<size_t> max_tasks_count = 0;
CurrentMetrics::Metric metric; CurrentMetrics::Metric metric;
CurrentMetrics::Increment max_tasks_metric;
void routine(TaskRuntimeDataPtr item); void routine(TaskRuntimeDataPtr item);

View File

@ -7484,7 +7484,7 @@ MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts,
if (moving_tagger->parts_to_move.empty()) if (moving_tagger->parts_to_move.empty())
return MovePartsOutcome::NothingToMove; return MovePartsOutcome::NothingToMove;
return moveParts(moving_tagger); return moveParts(moving_tagger, true);
} }
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove() MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
@ -7539,7 +7539,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this); return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
} }
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger) MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy)
{ {
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size()); LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
@ -7587,22 +7587,42 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
/// be fixed. /// be fixed.
auto disk = moving_part.reserved_space->getDisk(); auto disk = moving_part.reserved_space->getDisk();
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication) if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// This loop is not endless, if shutdown called/connection failed/replica became readonly
/// we will return true from waitZeroCopyLock and createZeroCopyLock will return nullopt.
while (true)
{ {
/// If we acquired lock than let's try to move. After one /// If we acquired lock than let's try to move. After one
/// replica will actually move the part from disk to some /// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch /// zero-copy storage other replicas will just fetch
/// metainformation. /// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock) if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
if (lock->isLocked())
{ {
cloned_part = parts_mover.clonePart(moving_part); cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part); parts_mover.swapClonedPart(cloned_part);
break;
}
else if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
} }
else else
{ {
/// Move will be retried but with backoff. /// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name); LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy; result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
continue; break;
}
} }
} }
else /// Ordinary move as it should be else /// Ordinary move as it should be

View File

@ -1456,7 +1456,7 @@ private:
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>; using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
/// Move selected parts to corresponding disks /// Move selected parts to corresponding disks
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger); MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy=false);
/// Select parts for move and disks for them. Used in background moving processes. /// Select parts for move and disks for them. Used in background moving processes.
CurrentlyMovingPartsTaggerPtr selectPartsForMove(); CurrentlyMovingPartsTaggerPtr selectPartsForMove();
@ -1511,6 +1511,7 @@ private:
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of /// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree. /// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; } virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
virtual bool waitZeroCopyLockToDisappear(const ZeroCopyLock &, size_t) { return false; }
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings. /// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part. /// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.

View File

@ -127,9 +127,11 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk); zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
if (!zero_copy_lock) if (!zero_copy_lock || !zero_copy_lock->isLocked())
{ {
storage.watchZeroCopyLock(entry.new_part_name, disk);
LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name); LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name);
return PrepareResult{ return PrepareResult{
.prepared_successfully = false, .prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false, .need_to_check_missing_part_in_fetch = false,

View File

@ -30,7 +30,9 @@ public:
UInt64 getPriority() override { return priority; } UInt64 getPriority() override { return priority; }
private: private:
ReplicatedMergeMutateTaskBase::PrepareResult prepare() override; ReplicatedMergeMutateTaskBase::PrepareResult prepare() override;
bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) override; bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) override;
bool executeInnerTask() override bool executeInnerTask() override

View File

@ -1370,6 +1370,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{ {
constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting for {} to execute it and will fetch after."; constexpr auto fmt_string = "Not executing merge/mutation for the part {}, waiting for {} to execute it and will fetch after.";
out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge); out_postpone_reason = fmt::format(fmt_string, entry.new_part_name, replica_to_execute_merge);
LOG_TEST(log, fmt_string, entry.new_part_name, replica_to_execute_merge);
return false; return false;
} }
} }

View File

@ -14,6 +14,7 @@ struct ZeroCopyLock
{ {
ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path, const std::string & lock_message); ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path, const std::string & lock_message);
bool isLocked() const { return lock->isLocked(); }
/// Actual lock /// Actual lock
std::unique_ptr<zkutil::ZooKeeperLock> lock; std::unique_ptr<zkutil::ZooKeeperLock> lock;
}; };

View File

@ -15,6 +15,7 @@ using namespace DB;
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric BackgroundMergesAndMutationsPoolTask; extern const Metric BackgroundMergesAndMutationsPoolTask;
extern const Metric BackgroundMergesAndMutationsPoolSize;
} }
std::random_device device; std::random_device device;
@ -102,7 +103,8 @@ TEST(Executor, Simple)
"GTest", "GTest",
1, // threads 1, // threads
100, // max_tasks 100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
); );
String schedule; // mutex is not required because we have a single worker String schedule; // mutex is not required because we have a single worker
@ -144,7 +146,8 @@ TEST(Executor, RemoveTasks)
"GTest", "GTest",
tasks_kinds, tasks_kinds,
tasks_kinds * batch, tasks_kinds * batch,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
); );
for (size_t i = 0; i < batch; ++i) for (size_t i = 0; i < batch; ++i)
@ -184,7 +187,8 @@ TEST(Executor, RemoveTasksStress)
"GTest", "GTest",
tasks_kinds, tasks_kinds,
tasks_kinds * batch * (schedulers_count + removers_count), tasks_kinds * batch * (schedulers_count + removers_count),
CurrentMetrics::BackgroundMergesAndMutationsPoolTask CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
); );
std::barrier barrier(schedulers_count + removers_count); std::barrier barrier(schedulers_count + removers_count);
@ -234,7 +238,8 @@ TEST(Executor, UpdatePolicy)
"GTest", "GTest",
1, // threads 1, // threads
100, // max_tasks 100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask CurrentMetrics::BackgroundMergesAndMutationsPoolTask,
CurrentMetrics::BackgroundMergesAndMutationsPoolSize
); );
String schedule; // mutex is not required because we have a single worker String schedule; // mutex is not required because we have a single worker

View File

@ -8555,7 +8555,6 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica; return best_replica;
} }
Strings StorageReplicatedMergeTree::getZeroCopyPartPath( Strings StorageReplicatedMergeTree::getZeroCopyPartPath(
const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid, const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old) const String & part_name, const String & zookeeper_path_old)
@ -8575,19 +8574,66 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(
return res; return res;
} }
bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica) void StorageReplicatedMergeTree::watchZeroCopyLock(const String & part_name, const DiskPtr & disk)
{ {
auto path = getZeroCopyPartPath(part_name, disk); auto path = getZeroCopyPartPath(part_name, disk);
if (path) if (path)
{ {
/// FIXME auto zookeeper = getZooKeeper();
auto lock_path = fs::path(*path) / "part_exclusive_lock"; auto lock_path = fs::path(*path) / "part_exclusive_lock";
if (getZooKeeper()->tryGet(lock_path, lock_replica)) LOG_TEST(log, "Adding zero-copy lock on {}", lock_path);
/// Looks ugly, but we cannot touch any storage fields inside Watch callback
/// because it could lead to use-after-free (storage dropped and watch triggered)
std::shared_ptr<std::atomic<bool>> flag = std::make_shared<std::atomic<bool>>(true);
std::string replica;
bool exists = zookeeper->tryGetWatch(lock_path, replica, nullptr, [flag] (const Coordination::WatchResponse &)
{ {
*flag = false;
});
if (exists)
{
std::lock_guard lock(existing_zero_copy_locks_mutex);
existing_zero_copy_locks[lock_path] = ZeroCopyLockDescription{replica, flag};
}
}
}
bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica)
{
auto path = getZeroCopyPartPath(part_name, disk);
std::lock_guard lock(existing_zero_copy_locks_mutex);
/// Cleanup abandoned locks during each check. The set of locks is small and this is quite fast loop.
/// Also it's hard to properly remove locks because we can execute replication queue
/// in arbitrary order and some parts can be replaced by covering parts without merges.
for (auto it = existing_zero_copy_locks.begin(); it != existing_zero_copy_locks.end();)
{
if (*it->second.exists)
++it;
else
{
LOG_TEST(log, "Removing zero-copy lock on {}", it->first);
it = existing_zero_copy_locks.erase(it);
}
}
if (path)
{
auto lock_path = fs::path(*path) / "part_exclusive_lock";
if (auto it = existing_zero_copy_locks.find(lock_path); it != existing_zero_copy_locks.end())
{
lock_replica = it->second.replica;
if (*it->second.exists)
{
LOG_TEST(log, "Zero-copy lock on path {} exists", it->first);
return true; return true;
} }
} }
LOG_TEST(log, "Zero-copy lock on path {} doesn't exist", lock_path);
}
return false; return false;
} }
@ -8599,11 +8645,37 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0]; return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
} }
bool StorageReplicatedMergeTree::waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait)
{
if (lock.isLocked())
return true;
if (partial_shutdown_called.load(std::memory_order_relaxed))
return true;
auto lock_path = lock.lock->getLockPath();
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return true;
Stopwatch time_waiting;
const auto & stop_waiting = [&]()
{
bool timeout_exceeded = milliseconds_to_wait < time_waiting.elapsedMilliseconds();
return partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed) || timeout_exceeded;
};
return zookeeper->waitForDisappear(lock_path, stop_waiting);
}
std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
{ {
if (!disk || !disk->supportZeroCopyReplication()) if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt; return std::nullopt;
if (partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed))
return std::nullopt;
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper) if (!zookeeper)
return std::nullopt; return std::nullopt;
@ -8616,10 +8688,8 @@ std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusi
/// Create actual lock /// Create actual lock
ZeroCopyLock lock(zookeeper, zc_zookeeper_path, replica_name); ZeroCopyLock lock(zookeeper, zc_zookeeper_path, replica_name);
if (lock.lock->tryLock()) lock.lock->tryLock();
return lock; return lock;
else
return std::nullopt;
} }
String StorageReplicatedMergeTree::findReplicaHavingPart( String StorageReplicatedMergeTree::findReplicaHavingPart(

View File

@ -482,6 +482,16 @@ private:
std::mutex last_broken_disks_mutex; std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks; std::set<String> last_broken_disks;
std::mutex existing_zero_copy_locks_mutex;
struct ZeroCopyLockDescription
{
std::string replica;
std::shared_ptr<std::atomic<bool>> exists;
};
std::unordered_map<String, ZeroCopyLockDescription> existing_zero_copy_locks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context); static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func> template <class Func>
@ -862,13 +872,19 @@ private:
void createTableSharedID() const; void createTableSharedID() const;
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica); bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
void watchZeroCopyLock(const String & part_name, const DiskPtr & disk);
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk); std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication. /// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If somebody already holding the lock -- return std::nullopt. /// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
/// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override; std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
/// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
/// Or if node actually disappeared.
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
void startupImpl(bool from_attach_thread); void startupImpl(bool from_attach_thread);
}; };

View File

@ -0,0 +1,64 @@
#include <Storages/System/StorageSystemMarkedDroppedTables.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeUUID.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include "base/types.h"
namespace DB
{
NamesAndTypesList StorageSystemMarkedDroppedTables::getNamesAndTypes()
{
NamesAndTypesList names_and_types{
{"index", std::make_shared<DataTypeUInt32>()},
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"uuid", std::make_shared<DataTypeUUID>()},
{"engine", std::make_shared<DataTypeString>()},
{"metadata_dropped_path", std::make_shared<DataTypeString>()},
{"table_dropped_time", std::make_shared<DataTypeDateTime>()},
};
return names_and_types;
}
void StorageSystemMarkedDroppedTables::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped();
size_t index = 0;
auto & column_index = assert_cast<ColumnUInt32 &>(*res_columns[index++]);
auto & column_database = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_table = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_uuid = assert_cast<ColumnUUID &>(*res_columns[index++]).getData();
auto & column_engine = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_metadata_dropped_path = assert_cast<ColumnString &>(*res_columns[index++]);
auto & column_table_dropped_time = assert_cast<ColumnUInt32 &>(*res_columns[index++]);
auto add_row = [&](UInt32 idx, const DatabaseCatalog::TableMarkedAsDropped & table_mark_dropped)
{
column_index.insertValue(idx);
column_database.insertData(table_mark_dropped.table_id.getDatabaseName().data(), table_mark_dropped.table_id.getDatabaseName().size());
column_table.insertData(table_mark_dropped.table_id.getTableName().data(), table_mark_dropped.table_id.getTableName().size());
column_uuid.push_back(table_mark_dropped.table_id.uuid.toUnderType());
if (table_mark_dropped.table)
column_engine.insertData(table_mark_dropped.table->getName().data(), table_mark_dropped.table->getName().size());
else
column_engine.insertData({}, 0);
column_metadata_dropped_path.insertData(table_mark_dropped.metadata_path.data(), table_mark_dropped.metadata_path.size());
column_table_dropped_time.insertValue(static_cast<UInt32>(table_mark_dropped.drop_time));
};
UInt32 idx = 0;
for (const auto & table_mark_dropped : tables_mark_dropped)
add_row(idx++, table_mark_dropped);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemMarkedDroppedTables final : public IStorageSystemOneBlock<StorageSystemMarkedDroppedTables>
{
public:
std::string getName() const override { return "SystemMarkedDroppedTables"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
};
}

View File

@ -79,6 +79,7 @@
#include <Storages/System/StorageSystemRemoteDataPaths.h> #include <Storages/System/StorageSystemRemoteDataPaths.h>
#include <Storages/System/StorageSystemCertificates.h> #include <Storages/System/StorageSystemCertificates.h>
#include <Storages/System/StorageSystemSchemaInferenceCache.h> #include <Storages/System/StorageSystemSchemaInferenceCache.h>
#include <Storages/System/StorageSystemMarkedDroppedTables.h>
#ifdef OS_LINUX #ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h> #include <Storages/System/StorageSystemStackTrace.h>
@ -140,6 +141,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach<StorageSystemTimeZones>(context, system_database, "time_zones"); attach<StorageSystemTimeZones>(context, system_database, "time_zones");
attach<StorageSystemBackups>(context, system_database, "backups"); attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache"); attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
attach<StorageSystemMarkedDroppedTables>(context, system_database, "marked_dropped_tables");
#ifdef OS_LINUX #ifdef OS_LINUX
attach<StorageSystemStackTrace>(context, system_database, "stack_trace"); attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif #endif

View File

@ -1,4 +1,8 @@
<clickhouse> <clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration> <storage_configuration>
<disks> <disks>
<s3> <s3>
@ -21,6 +25,13 @@
</main> </main>
</volumes> </volumes>
</s3> </s3>
<s3_only>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_only>
</policies> </policies>
</storage_configuration> </storage_configuration>

View File

@ -5,6 +5,7 @@ import random
import string import string
import time import time
from multiprocessing.dummy import Pool
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
@ -102,3 +103,133 @@ SETTINGS index_granularity = 8192, storage_policy = 's3'"""
assert part_to_disk["20230102_0_0_0"] == "s3" assert part_to_disk["20230102_0_0_0"] == "s3"
assert part_to_disk["20230109_0_0_0"] == "s3" assert part_to_disk["20230109_0_0_0"] == "s3"
assert part_to_disk["20230116_0_0_0"] == "default" assert part_to_disk["20230116_0_0_0"] == "default"
def test_concurrent_move_to_s3(started_cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r1')
PARTITION BY CounterID
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
)
node2.query(
"""
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r2')
PARTITION BY CounterID
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
)
partitions = range(10)
for i in partitions:
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number), {i} from system.numbers limit 20"
)
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
)
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
)
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
)
node2.query("SYSTEM SYNC REPLICA test_concurrent_move")
# check that we can move parts concurrently without exceptions
p = Pool(3)
for i in partitions:
def move_partition_to_s3(node):
node.query(
f"ALTER TABLE test_concurrent_move MOVE PARTITION '{i}' TO DISK 's3'"
)
j1 = p.apply_async(move_partition_to_s3, (node1,))
j2 = p.apply_async(move_partition_to_s3, (node2,))
j1.get()
j2.get()
def get_part_to_disk(query_result):
part_to_disk = {}
for row in query_result.strip().split("\n"):
disk, part = row.split("\t")
part_to_disk[part] = disk
return part_to_disk
part_to_disk = get_part_to_disk(
node1.query(
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
)
)
assert all([value == "s3" for value in part_to_disk.values()])
part_to_disk = get_part_to_disk(
node2.query(
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
)
)
assert all([value == "s3" for value in part_to_disk.values()])
def test_zero_copy_mutation(started_cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query(
"""
CREATE TABLE test_zero_copy_mutation (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_zero_copy_mutation', 'r1')
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3_only'"""
)
node2.query(
"""
CREATE TABLE test_zero_copy_mutation (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_zero_copy_mutation', 'r2')
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3_only'"""
)
node1.query(
"INSERT INTO test_zero_copy_mutation SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), number * number from system.numbers limit 10"
)
node2.query("SYSTEM STOP REPLICATION QUEUES test_zero_copy_mutation")
p = Pool(3)
def run_long_mutation(node):
node1.query(
"ALTER TABLE test_zero_copy_mutation DELETE WHERE sleepEachRow(1) == 1"
)
job = p.apply_async(run_long_mutation, (node1,))
for i in range(30):
count = node1.query(
"SELECT count() FROM system.replication_queue WHERE type = 'MUTATE_PART'"
).strip()
if int(count) > 0:
break
else:
time.sleep(0.1)
node2.query("SYSTEM START REPLICATION QUEUES test_zero_copy_mutation")
node2.query("SYSTEM SYNC REPLICA test_zero_copy_mutation")
job.get()
assert node2.contains_in_log("all_0_0_0_1/part_exclusive_lock exists")
assert node2.contains_in_log("Removing zero-copy lock on")
assert node2.contains_in_log("all_0_0_0_1/part_exclusive_lock doesn't exist")

View File

@ -1,2 +0,0 @@
set input_format_parallel_parsing=1;
select * from format(BSONEachRow, 'x UInt32', x'00000000'); -- {serverError INCORRECT_DATA}

View File

@ -0,0 +1,8 @@
25400_marked_dropped_tables MergeTree
index UInt32
database String
table String
uuid UUID
engine String
metadata_dropped_path String
table_dropped_time DateTime

View File

@ -0,0 +1,11 @@
-- Tags: no-ordinary-database
SET database_atomic_wait_for_drop_and_detach_synchronously = 0;
DROP TABLE IF EXISTS 25400_marked_dropped_tables;
CREATE TABLE 25400_marked_dropped_tables (id Int32) Engine=MergeTree() ORDER BY id;
DROP TABLE 25400_marked_dropped_tables;
SELECT table, engine FROM system.marked_dropped_tables WHERE database = currentDatabase() LIMIT 1;
DESCRIBE TABLE system.marked_dropped_tables;