Merge branch 'master' into fix-current-size-count-in-cache

This commit is contained in:
Kseniia Sumarokova 2022-05-06 11:18:43 +02:00 committed by GitHub
commit 18b5f28ec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 268 additions and 276 deletions

View File

@ -1,185 +1,148 @@
Checks: '-*,
misc-misleading-bidirectional,
misc-misleading-identifier,
misc-misplaced-const,
misc-redundant-expression,
misc-static-assert,
misc-throw-by-value-catch-by-reference,
misc-unconventional-assign-operator,
misc-uniqueptr-reset-release,
misc-unused-alias-decls,
misc-unused-parameters,
misc-unused-using-decls,
Checks: '*,
-abseil-*,
modernize-avoid-bind,
modernize-loop-convert,
modernize-macro-to-enum,
modernize-make-shared,
modernize-make-unique,
modernize-raw-string-literal,
modernize-redundant-void-arg,
modernize-replace-random-shuffle,
modernize-shrink-to-fit,
modernize-use-bool-literals,
modernize-use-equals-default,
modernize-use-equals-delete,
modernize-use-nullptr,
modernize-use-transparent-functors,
modernize-use-uncaught-exceptions,
modernize-use-using,
-altera-*,
performance-faster-string-find,
performance-for-range-copy,
performance-implicit-conversion-in-loop,
performance-inefficient-algorithm,
performance-inefficient-vector-operation,
performance-move-const-arg,
performance-move-constructor-init,
performance-no-automatic-move,
performance-noexcept-move-constructor,
performance-trivially-destructible,
performance-unnecessary-copy-initialization,
-android-*,
readability-avoid-const-params-in-decls,
readability-const-return-type,
readability-container-contains,
readability-container-size-empty,
readability-convert-member-functions-to-static,
readability-delete-null-pointer,
readability-deleted-default,
readability-duplicate-include,
readability-identifier-naming,
readability-inconsistent-declaration-parameter-name,
readability-make-member-function-const,
readability-misplaced-array-index,
readability-non-const-parameter,
readability-qualified-auto,
readability-redundant-access-specifiers,
readability-redundant-control-flow,
readability-redundant-function-ptr-dereference,
readability-redundant-member-init,
readability-redundant-preprocessor,
readability-redundant-smartptr-get,
readability-redundant-string-cstr,
readability-redundant-string-init,
readability-simplify-boolean-expr,
readability-simplify-subscript-expr,
readability-static-definition-in-anonymous-namespace,
readability-string-compare,
readability-uniqueptr-delete-release,
-bugprone-assert-side-effect,
-bugprone-branch-clone,
-bugprone-dynamic-static-initializers,
-bugprone-easily-swappable-parameters,
-bugprone-exception-escape,
-bugprone-forwarding-reference-overload,
-bugprone-implicit-widening-of-multiplication-result,
-bugprone-lambda-function-name,
-bugprone-misplaced-widening-cast,
-bugprone-narrowing-conversions,
-bugprone-no-escape,
-bugprone-not-null-terminated-result,
-bugprone-signal-handler,
-bugprone-spuriously-wake-up-functions,
-bugprone-suspicious-semicolon,
-bugprone-unhandled-exception-at-new,
-bugprone-unhandled-self-assignment,
bugprone-argument-comment,
bugprone-bad-signal-to-kill-thread,
bugprone-bool-pointer-implicit-conversion,
bugprone-copy-constructor-init,
bugprone-dangling-handle,
bugprone-fold-init-type,
bugprone-forward-declaration-namespace,
bugprone-inaccurate-erase,
bugprone-incorrect-roundings,
bugprone-infinite-loop,
bugprone-integer-division,
bugprone-lambda-function-name,
bugprone-macro-parentheses,
bugprone-macro-repeated-side-effects,
bugprone-misplaced-operator-in-strlen-in-alloc,
bugprone-misplaced-pointer-artithmetic-in-alloc,
bugprone-misplaced-widening-cast,
bugprone-move-forwarding-reference,
bugprone-multiple-statement-macro,
bugprone-parent-virtual-call,
bugprone-posix-return,
bugprone-redundant-branch-condition,
bugprone-reserved-identifier,
bugprone-shared-ptr-array-mismatch,
bugprone-signed-char-misuse,
bugprone-sizeof-container,
bugprone-sizeof-expression,
bugprone-string-constructor,
bugprone-string-integer-assignment,
bugprone-string-literal-with-embedded-nul,
bugprone-stringview-nullptr,
bugprone-suspicious-enum-usage,
bugprone-suspicious-include,
bugprone-suspicious-memory-comparison,
bugprone-suspicious-memset-usage,
bugprone-suspicious-missing-comma,
bugprone-suspicious-string-compare,
bugprone-swapped-arguments,
bugprone-terminating-continue,
bugprone-throw-keyword-missing,
bugprone-too-small-loop-variable,
bugprone-undefined-memory-manipulation,
bugprone-undelegated-constructor,
bugprone-unhandled-self-assignment,
bugprone-unused-raii,
bugprone-unused-return-value,
bugprone-use-after-move,
bugprone-virtual-near-miss,
-cert-dcl16-c,
-cert-dcl37-c,
-cert-dcl51-cpp,
-cert-dcl58-cpp,
-cert-err58-cpp,
-cert-err60-cpp,
-cert-msc32-c,
-cert-msc51-cpp,
-cert-oop54-cpp,
-cert-oop57-cpp,
-cert-oop58-cpp,
cert-dcl21-cpp,
cert-dcl50-cpp,
cert-env33-c,
cert-err34-c,
cert-err52-cpp,
cert-flp30-c,
cert-mem57-cpp,
cert-msc50-cpp,
cert-oop58-cpp,
-clang-analyzer-core.DynamicTypePropagation,
-clang-analyzer-core.uninitialized.CapturedBlockVariable,
google-build-explicit-make-pair,
google-build-namespaces,
google-default-arguments,
google-explicit-constructor,
google-readability-avoid-underscore-in-googletest-name,
google-readability-casting,
google-runtime-int,
google-runtime-operator,
-clang-analyzer-optin.performance.Padding,
-clang-analyzer-optin.portability.UnixAPI,
hicpp-exception-baseclass,
-clang-analyzer-security.insecureAPI.bzero,
-clang-analyzer-security.insecureAPI.strcpy,
clang-analyzer-core.CallAndMessage,
clang-analyzer-core.DivideZero,
clang-analyzer-core.NonNullParamChecker,
clang-analyzer-core.NullDereference,
clang-analyzer-core.StackAddressEscape,
clang-analyzer-core.UndefinedBinaryOperatorResult,
clang-analyzer-core.VLASize,
clang-analyzer-core.uninitialized.ArraySubscript,
clang-analyzer-core.uninitialized.Assign,
clang-analyzer-core.uninitialized.Branch,
clang-analyzer-core.uninitialized.CapturedBlockVariable,
clang-analyzer-core.uninitialized.UndefReturn,
clang-analyzer-cplusplus.InnerPointer,
clang-analyzer-cplusplus.Move,
clang-analyzer-cplusplus.NewDelete,
clang-analyzer-cplusplus.NewDeleteLeaks,
clang-analyzer-cplusplus.PlacementNewChecker,
clang-analyzer-cplusplus.SelfAssignment,
clang-analyzer-deadcode.DeadStores,
clang-analyzer-optin.cplusplus.UninitializedObject,
clang-analyzer-optin.cplusplus.VirtualCall,
clang-analyzer-security.insecureAPI.UncheckedReturn,
clang-analyzer-security.insecureAPI.bcmp,
clang-analyzer-security.insecureAPI.bcopy,
clang-analyzer-security.insecureAPI.bzero,
clang-analyzer-security.insecureAPI.getpw,
clang-analyzer-security.insecureAPI.gets,
clang-analyzer-security.insecureAPI.mkstemp,
clang-analyzer-security.insecureAPI.mktemp,
clang-analyzer-security.insecureAPI.rand,
clang-analyzer-security.insecureAPI.strcpy,
clang-analyzer-unix.Malloc,
clang-analyzer-unix.MallocSizeof,
clang-analyzer-unix.MismatchedDeallocator,
clang-analyzer-unix.Vfork,
clang-analyzer-unix.cstring.BadSizeArg,
clang-analyzer-unix.cstring.NullArg,
-cppcoreguidelines-*,
boost-use-to-string,
-concurrency-mt-unsafe,
alpha.security.cert.env.InvalidPtr,
-darwin-*,
-fuchsia-*,
-google-build-using-namespace,
-google-global-names-in-headers,
-google-readability-braces-around-statements,
-google-readability-function-size,
-google-readability-namespace-comments,
-google-readability-todo,
-google-upgrade-googletest-case,
-hicpp-avoid-c-arrays,
-hicpp-avoid-goto,
-hicpp-braces-around-statements,
-hicpp-deprecated-headers,
-hicpp-explicit-conversions,
-hicpp-function-size,
-hicpp-invalid-access-moved,
-hicpp-member-init,
-hicpp-move-const-arg,
-hicpp-multiway-paths-covered,
-hicpp-named-parameter,
-hicpp-no-array-decay,
-hicpp-no-assembler,
-hicpp-no-malloc,
-hicpp-signed-bitwise,
-hicpp-special-member-functions,
-hicpp-uppercase-literal-suffix,
-hicpp-use-auto,
-hicpp-use-emplace,
-hicpp-use-equals-default,
-hicpp-use-noexcept,
-hicpp-use-override,
-hicpp-vararg,
-llvm-*,
-llvmlibc-*,
-openmp-*,
-misc-definitions-in-headers,
-misc-new-delete-overloads,
-misc-no-recursion,
-misc-non-copyable-objects,
-misc-non-private-member-variables-in-classes,
-misc-static-assert,
-modernize-avoid-c-arrays,
-modernize-concat-nested-namespaces,
-modernize-deprecated-headers,
-modernize-deprecated-ios-base-aliases,
-modernize-pass-by-value,
-modernize-replace-auto-ptr,
-modernize-replace-disallow-copy-and-assign-macro,
-modernize-return-braced-init-list,
-modernize-unary-static-assert,
-modernize-use-auto,
-modernize-use-default-member-init,
-modernize-use-emplace,
-modernize-use-equals-default,
-modernize-use-nodiscard,
-modernize-use-noexcept,
-modernize-use-override,
-modernize-use-trailing-return-type,
-performance-inefficient-string-concatenation,
-performance-no-int-to-ptr,
-performance-type-promotion-in-math-fn,
-performance-trivially-destructible,
-performance-unnecessary-value-param,
-portability-simd-intrinsics,
-readability-convert-member-functions-to-static,
-readability-braces-around-statements,
-readability-else-after-return,
-readability-function-cognitive-complexity,
-readability-function-size,
-readability-implicit-bool-conversion,
-readability-isolate-declaration,
-readability-magic-numbers,
-readability-misleading-indentation,
-readability-named-parameter,
-readability-qualified-auto,
-readability-redundant-declaration,
-readability-static-accessed-through-instance,
-readability-suspicious-call-argument,
-readability-uppercase-literal-suffix,
-readability-use-anyofallof,
-zirkon-*,
'
WarningsAsErrors: '*'
CheckOptions:

View File

@ -28,6 +28,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <IO/IOThreadPool.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/ErrorHandlers.h>
@ -105,6 +106,17 @@ void LocalServer::initialize(Poco::Util::Application & self)
auto loaded_config = config_processor.loadConfig();
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}
GlobalThreadPool::initialize(
config().getUInt("max_thread_pool_size", 10000),
config().getUInt("max_thread_pool_free_size", 1000),
config().getUInt("thread_pool_queue_size", 10000)
);
IOThreadPool::initialize(
config().getUInt("max_io_thread_pool_size", 100),
config().getUInt("max_io_thread_pool_free_size", 0),
config().getUInt("io_thread_pool_queue_size", 10000));
}

View File

@ -367,7 +367,7 @@
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<!-- Disable AuthType plaintext_password and no_password for ACL. -->
<!-- <allow_plaintext_password>0</allow_plaintext_password> -->
<!-- <allow_no_password>0</allow_no_password> -->`
@ -1297,8 +1297,8 @@
-->
<!-- Uncomment if enable merge tree metadata cache -->
<merge_tree_metadata_cache>
<!--merge_tree_metadata_cache>
<lru_cache_size>268435456</lru_cache_size>
<continue_if_corrupted>true</continue_if_corrupted>
</merge_tree_metadata_cache>
</merge_tree_metadata_cache-->
</clickhouse>

View File

@ -1430,15 +1430,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
apply_query_settings(*with_output->settings_ast);
if (!connection->checkConnected())
{
auto poco_logs_level = Poco::Logger::parseLevel(config().getString("send_logs_level", "none"));
/// Print under WARNING also because it is used by clickhouse-test.
if (poco_logs_level >= Poco::Message::PRIO_WARNING)
{
fmt::print(stderr, "Connection lost. Reconnecting.\n");
}
connect();
}
ASTPtr input_function;
if (insert && insert->select)

View File

@ -410,7 +410,7 @@ Packet LocalConnection::receivePacket()
{
if (state->profile_info)
{
packet.profile_info = std::move(*state->profile_info);
packet.profile_info = *state->profile_info;
state->profile_info.reset();
}
next_packet_type.reset();

View File

@ -59,12 +59,6 @@ Exception::Exception(const std::string & msg, int code, bool remote_)
handle_error_code(msg, code, remote, getStackFramePointers());
}
Exception::Exception(const std::string & msg, const Exception & nested, int code)
: Poco::Exception(msg, nested, code)
{
handle_error_code(msg, code, remote, getStackFramePointers());
}
Exception::Exception(CreateFromPocoTag, const Poco::Exception & exc)
: Poco::Exception(exc.displayText(), ErrorCodes::POCO_EXCEPTION)
{

View File

@ -29,7 +29,6 @@ public:
Exception() = default;
Exception(const std::string & msg, int code, bool remote_ = false);
Exception(const std::string & msg, const Exception & nested, int code);
Exception(int code, const std::string & message)
: Exception(message, code)

View File

@ -216,9 +216,9 @@ private:
FileSegmentCell(FileSegmentPtr file_segment_, LRUFileCache * cache, std::lock_guard<std::mutex> & cache_lock);
FileSegmentCell(FileSegmentCell && other)
FileSegmentCell(FileSegmentCell && other) noexcept
: file_segment(std::move(other.file_segment))
, queue_iterator(std::move(other.queue_iterator)) {}
, queue_iterator(other.queue_iterator) {}
};
using FileSegmentsByOffset = std::map<size_t, FileSegmentCell>;

View File

@ -227,7 +227,7 @@ private:
struct FileSegmentsHolder : private boost::noncopyable
{
explicit FileSegmentsHolder(FileSegments && file_segments_) : file_segments(std::move(file_segments_)) {}
FileSegmentsHolder(FileSegmentsHolder && other) : file_segments(std::move(other.file_segments)) {}
FileSegmentsHolder(FileSegmentsHolder && other) noexcept : file_segments(std::move(other.file_segments)) {}
~FileSegmentsHolder();

View File

@ -26,7 +26,6 @@ namespace ErrorCodes
extern const int PATH_ACCESS_DENIED;;
extern const int FILE_DOESNT_EXIST;
extern const int BAD_FILE_TYPE;
extern const int MEMORY_LIMIT_EXCEEDED;
}
@ -91,70 +90,55 @@ IDiskRemote::Metadata IDiskRemote::Metadata::createAndStoreMetadataIfNotExists(c
void IDiskRemote::Metadata::load()
{
try
const ReadSettings read_settings;
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, *buf);
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
assertChar('\n', *buf);
UInt32 remote_fs_objects_count;
readIntText(remote_fs_objects_count, *buf);
assertChar('\t', *buf);
readIntText(total_size, *buf);
assertChar('\n', *buf);
remote_fs_objects.resize(remote_fs_objects_count);
for (size_t i = 0; i < remote_fs_objects_count; ++i)
{
const ReadSettings read_settings;
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, *buf);
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
assertChar('\n', *buf);
UInt32 remote_fs_objects_count;
readIntText(remote_fs_objects_count, *buf);
String remote_fs_object_path;
size_t remote_fs_object_size;
readIntText(remote_fs_object_size, *buf);
assertChar('\t', *buf);
readIntText(total_size, *buf);
assertChar('\n', *buf);
remote_fs_objects.resize(remote_fs_objects_count);
for (size_t i = 0; i < remote_fs_objects_count; ++i)
readEscapedString(remote_fs_object_path, *buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
String remote_fs_object_path;
size_t remote_fs_object_size;
readIntText(remote_fs_object_size, *buf);
assertChar('\t', *buf);
readEscapedString(remote_fs_object_path, *buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath());
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath());
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', *buf);
remote_fs_objects[i].relative_path = remote_fs_object_path;
remote_fs_objects[i].bytes_size = remote_fs_object_size;
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
readIntText(ref_count, *buf);
assertChar('\n', *buf);
if (version >= VERSION_READ_ONLY_FLAG)
{
readBoolText(read_only, *buf);
assertChar('\n', *buf);
}
remote_fs_objects[i].relative_path = remote_fs_object_path;
remote_fs_objects[i].bytes_size = remote_fs_object_size;
}
catch (Exception & e)
readIntText(ref_count, *buf);
assertChar('\n', *buf);
if (version >= VERSION_READ_ONLY_FLAG)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
throw;
if (e.code() == ErrorCodes::MEMORY_LIMIT_EXCEEDED)
throw;
throw Exception("Failed to read metadata file: " + metadata_file_path, e, ErrorCodes::UNKNOWN_FORMAT);
readBoolText(read_only, *buf);
assertChar('\n', *buf);
}
}
@ -166,7 +150,6 @@ IDiskRemote::Metadata::Metadata(
: remote_fs_root_path(remote_fs_root_path_)
, metadata_file_path(metadata_file_path_)
, metadata_disk(metadata_disk_)
, total_size(0), ref_count(0)
{
}

View File

@ -161,7 +161,7 @@ std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfigurat
return std::make_unique<DiskS3Settings>(
getClient(config, config_prefix, context),
std::move(rw_settings),
rw_settings,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),

View File

@ -37,7 +37,7 @@ BlockIO & BlockIO::operator= (BlockIO && rhs) noexcept
finish_callback = std::move(rhs.finish_callback);
exception_callback = std::move(rhs.exception_callback);
null_format = std::move(rhs.null_format);
null_format = rhs.null_format;
return *this;
}

View File

@ -958,9 +958,8 @@ void IMergeTreeDataPart::appendFilesOfPartitionAndMinMaxIndex(Strings & files) c
if (!parent_part)
partition.appendFiles(storage, files);
if (!isEmpty())
if (!parent_part)
minmax_idx->appendFiles(storage, files);
if (!parent_part)
minmax_idx->appendFiles(storage, files);
}
void IMergeTreeDataPart::loadChecksums(bool require)

View File

@ -2910,7 +2910,13 @@ void MergeTreeData::removePartsFromWorkingSet(
removePartsFromWorkingSet(txn, remove, clear_without_timeout, lock);
}
MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(
void MergeTreeData::removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock)
{
removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(txn, drop_range, lock);
}
MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock)
{
DataPartsVector parts_to_remove;
@ -2981,14 +2987,21 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(
bool clear_without_timeout = true;
/// We a going to remove active parts covered by drop_range without timeout.
/// Let's also reset timeout for inactive parts.
/// Let's also reset timeout for inactive parts
/// and add these parts to list of parts to remove from ZooKeeper
auto inactive_parts_to_remove_immediately = getDataPartsVectorInPartitionForInternalUsage(DataPartState::Outdated, drop_range.partition_id, &lock);
for (auto & part : inactive_parts_to_remove_immediately)
part->remove_time.store(0, std::memory_order_relaxed);
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock);
for (auto & part : inactive_parts_to_remove_immediately)
{
if (!drop_range.contains(part->info))
continue;
part->remove_time.store(0, std::memory_order_relaxed);
parts_to_remove.push_back(std::move(part));
}
return parts_to_remove;
}

View File

@ -578,11 +578,15 @@ public:
void removePartsFromWorkingSet(MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr);
void removePartsFromWorkingSet(MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock);
/// Removes all parts from the working set parts
/// for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block).
/// Used in REPLACE PARTITION command;
DataPartsVector removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range,
DataPartsLock & lock);
/// Removes all parts covered by drop_range from the working set parts.
/// Used in REPLACE PARTITION command.
void removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
/// Same as above, but also returns list of parts to remove from ZooKeeper.
/// It includes parts that have been just removed by these method
/// and Outdated parts covered by drop_range that were removed earlier for any reason.
DataPartsVector removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
/// Restores Outdated part and adds it to working set
void restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock = nullptr);

View File

@ -151,13 +151,13 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
setNotReadonly();
/// Start queue processing
storage.part_check_thread.start();
storage.background_operations_assignee.start();
storage.queue_updating_task->activateAndSchedule();
storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.part_check_thread.start();
return true;
}
@ -374,7 +374,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut
storage.mutations_finalizing_task->deactivate();
storage.cleanup_thread.stop();
storage.part_check_thread.stop();
/// Stop queue processing
{
@ -384,6 +383,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut
storage.background_operations_assignee.finish();
}
/// Stop part_check_thread after queue processing, because some queue tasks may restart part_check_thread
storage.part_check_thread.stop();
LOG_TRACE(log, "Threads finished");
}

View File

@ -1901,7 +1901,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
DataPartsVector parts_to_remove;
{
auto data_parts_lock = lockParts();
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range_info, data_parts_lock);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range_info, data_parts_lock);
if (parts_to_remove.empty())
{
if (!drop_range_info.isFakeDropRangePart())
@ -2037,7 +2037,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (parts_to_add.empty() && replace)
{
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
String parts_to_remove_str;
for (const auto & part : parts_to_remove)
{
@ -2181,8 +2181,32 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
{
throw Exception("Not found part " + part_desc->new_part_name +
" (or part covering it) neither source table neither remote replicas" , ErrorCodes::NO_REPLICA_HAS_PART);
/// We should enqueue missing part for check, so it will be replaced with empty one (if needed)
/// and we will be able to execute this REPLACE_RANGE.
/// However, it's quite dangerous, because part may appear in source table.
/// So we enqueue it for check only if no replicas of source table have part either.
bool need_check = true;
if (auto * replicated_src_table = typeid_cast<StorageReplicatedMergeTree *>(source_table.get()))
{
String src_replica = replicated_src_table->findReplicaHavingPart(part_desc->src_part_name, false);
if (!src_replica.empty())
{
LOG_DEBUG(log, "Found part {} on replica {} of source table, will not check part {} required for {}",
part_desc->src_part_name, src_replica, part_desc->new_part_name, entry.znode_name);
need_check = false;
}
}
if (need_check)
{
LOG_DEBUG(log, "Will check part {} required for {}, because no replicas have it (including replicas of source table)",
part_desc->new_part_name, entry.znode_name);
enqueuePartForCheck(part_desc->new_part_name);
}
throw Exception(ErrorCodes::NO_REPLICA_HAS_PART,
"Not found part {} (or part covering it) neither source table neither remote replicas",
part_desc->new_part_name);
}
}
@ -2287,7 +2311,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
transaction.commit(&data_parts_lock);
if (replace)
{
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
String parts_to_remove_str;
for (const auto & part : parts_to_remove)
{
@ -6542,7 +6566,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace)
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
}
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
@ -6765,7 +6789,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, lock);
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, lock);
transaction.commit(&lock);
}

View File

@ -0,0 +1,6 @@
<clickhouse>
<merge_tree_metadata_cache>
<lru_cache_size>268435456</lru_cache_size>
<continue_if_corrupted>true</continue_if_corrupted>
</merge_tree_metadata_cache>
</clickhouse>

View File

@ -31,6 +31,7 @@ ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/c
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/metadata_cache.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/

View File

@ -12,7 +12,7 @@ import pika
import pytest
from google.protobuf.internal.encoder import _VarintBytes
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, check_rabbitmq_is_available
from helpers.test_tools import TSV
from . import rabbitmq_pb2
@ -46,7 +46,7 @@ def wait_rabbitmq_to_start(rabbitmq_docker_id, timeout=180):
start = time.time()
while time.time() - start < timeout:
try:
if instance.cluster.check_rabbitmq_is_available(rabbitmq_docker_id):
if check_rabbitmq_is_available(rabbitmq_docker_id):
logging.debug("RabbitMQ is available")
return
time.sleep(0.5)