From 7d6beb55877936d73545fd742bdc33f1012cb26a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 25 Oct 2024 20:00:24 +0200 Subject: [PATCH 1/4] Add a lot of minor things from the private repository --- src/Access/AccessControl.h | 2 ++ src/Access/Authentication.cpp | 1 + src/Access/Common/AccessType.h | 1 + src/Access/RoleCache.h | 4 ++++ src/Access/tests/gtest_access_rights_ops.cpp | 3 ++- src/Core/ServerUUID.cpp | 5 +++++ src/Core/ServerUUID.h | 3 +++ src/Core/UUID.h | 3 +++ src/Databases/enableAllExperimentalSettings.cpp | 2 ++ src/Dictionaries/ExecutablePoolDictionarySource.cpp | 7 +++++++ src/Dictionaries/RedisDictionarySource.cpp | 1 - src/Dictionaries/XDBCDictionarySource.cpp | 6 ++++++ src/Disks/DiskEncrypted.h | 2 ++ src/Disks/DiskEncryptedTransaction.cpp | 1 - src/Disks/DiskType.h | 2 ++ src/Disks/IDisk.h | 3 ++- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 2 +- src/Disks/ObjectStorages/DiskObjectStorageMetadata.h | 2 ++ src/Disks/ObjectStorages/MetadataStorageFromDisk.h | 2 ++ src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp | 1 + src/IO/ReadBufferFromPocoSocket.cpp | 5 +++++ src/IO/ReadBufferFromPocoSocket.h | 2 ++ src/IO/S3Common.cpp | 1 - src/Interpreters/ActionLocksManager.cpp | 2 ++ src/Interpreters/BlobStorageLog.cpp | 1 + src/Interpreters/Cache/LRUFileCachePriority.h | 4 +++- src/Interpreters/Cache/SLRUFileCachePriority.h | 5 +++++ src/Interpreters/DatabaseCatalog.h | 6 ++++++ src/Interpreters/InterpreterSystemQuery.h | 3 +++ src/Interpreters/MutationsInterpreter.h | 5 ++++- src/Interpreters/Session.h | 1 - src/Interpreters/Squashing.cpp | 1 + src/Interpreters/executeDDLQueryOnCluster.cpp | 7 +++++++ src/Parsers/CommonParsers.h | 1 + src/Parsers/IAST.cpp | 1 + src/Server/CloudPlacementInfo.cpp | 3 +++ src/Storages/MergeTree/FutureMergedMutatedPart.h | 1 + src/Storages/MergeTree/IMergeTreeReader.h | 1 + src/Storages/MergeTree/MergeFromLogEntryTask.cpp | 2 ++ src/Storages/MergeTree/MergeProjectionPartsTask.cpp | 3 +++ src/Storages/MergeTree/MergeTreeDataFormatVersion.h | 4 ++-- src/Storages/MergeTree/MergeTreeDataMergerMutator.h | 2 ++ src/Storages/MergeTree/MergeTreeDataPartType.h | 1 + src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp | 4 ++-- src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp | 8 ++++++++ src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h | 1 + src/Storages/MergeTree/MergeTreeMutationStatus.cpp | 4 ++-- src/Storages/MergeTree/MergeTreePartInfo.h | 7 +++++++ src/Storages/MergeTree/MergeTreeRangeReader.h | 2 +- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 4 +++- src/Storages/MergeTree/checkDataPart.cpp | 2 +- src/Storages/ObjectStorage/S3/Configuration.h | 1 + src/Storages/StorageGenerateRandom.cpp | 3 +++ src/Storages/TableZnodeInfo.h | 2 ++ 54 files changed, 135 insertions(+), 18 deletions(-) diff --git a/src/Access/AccessControl.h b/src/Access/AccessControl.h index a91686433ec..a342c5300bf 100644 --- a/src/Access/AccessControl.h +++ b/src/Access/AccessControl.h @@ -9,6 +9,8 @@ #include +#include "config.h" + namespace Poco { diff --git a/src/Access/Authentication.cpp b/src/Access/Authentication.cpp index 8d5d04a4ed2..1d69a659cd6 100644 --- a/src/Access/Authentication.cpp +++ b/src/Access/Authentication.cpp @@ -12,6 +12,7 @@ #include "config.h" + namespace DB { diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index e9f24a8c685..383e7f70420 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -193,6 +193,7 @@ enum class AccessType : uint8_t M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \ M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP REPLICATION QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \ M(SYSTEM_VIRTUAL_PARTS_UPDATE, "SYSTEM STOP VIRTUAL PARTS UPDATE, SYSTEM START VIRTUAL PARTS UPDATE, STOP VIRTUAL PARTS UPDATE, START VIRTUAL PARTS UPDATE", TABLE, SYSTEM) \ + M(SYSTEM_REDUCE_BLOCKING_PARTS, "SYSTEM STOP REDUCE BLOCKING PARTS, SYSTEM START REDUCE BLOCKING PARTS, STOP REDUCE BLOCKING PARTS, START REDUCE BLOCKING PARTS", TABLE, SYSTEM) \ M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \ M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \ M(SYSTEM_REPLICA_READINESS, "SYSTEM REPLICA READY, SYSTEM REPLICA UNREADY", GLOBAL, SYSTEM) \ diff --git a/src/Access/RoleCache.h b/src/Access/RoleCache.h index 75d1fd32685..b707a05346f 100644 --- a/src/Access/RoleCache.h +++ b/src/Access/RoleCache.h @@ -22,6 +22,10 @@ public: const std::vector & current_roles, const std::vector & current_roles_with_admin_option); + std::shared_ptr getEnabledRoles( + boost::container::flat_set current_roles, + boost::container::flat_set current_roles_with_admin_option); + private: using SubscriptionsOnRoles = std::vector>; diff --git a/src/Access/tests/gtest_access_rights_ops.cpp b/src/Access/tests/gtest_access_rights_ops.cpp index 902fc949840..41567905a10 100644 --- a/src/Access/tests/gtest_access_rights_ops.cpp +++ b/src/Access/tests/gtest_access_rights_ops.cpp @@ -284,7 +284,8 @@ TEST(AccessRights, Union) "CREATE DICTIONARY, DROP DATABASE, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, " "TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, " "SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, " - "SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, " + "SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM CLEANUP, SYSTEM VIEWS, SYSTEM SENDS, " + "SYSTEM REPLICATION QUEUES, SYSTEM VIRTUAL PARTS UPDATE, SYSTEM REDUCE BLOCKING PARTS, " "SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, " "SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, " "SYSTEM UNLOAD PRIMARY KEY, dictGet ON db1.*, GRANT TABLE ENGINE ON db1, " diff --git a/src/Core/ServerUUID.cpp b/src/Core/ServerUUID.cpp index 251b407e673..5b17017e7f4 100644 --- a/src/Core/ServerUUID.cpp +++ b/src/Core/ServerUUID.cpp @@ -68,6 +68,11 @@ UUID loadServerUUID(const fs::path & server_uuid_file, Poco::Logger * log) } } +void ServerUUID::set(UUID & uuid) +{ + server_uuid = uuid; +} + void ServerUUID::setRandomForUnitTests() { server_uuid = UUIDHelpers::generateV4(); diff --git a/src/Core/ServerUUID.h b/src/Core/ServerUUID.h index 9c7f7d32acc..26711bfbfaa 100644 --- a/src/Core/ServerUUID.h +++ b/src/Core/ServerUUID.h @@ -20,6 +20,9 @@ public: /// Loads server UUID from file or creates new one. Should be called on daemon startup. static void load(const fs::path & server_uuid_file, Poco::Logger * log); + /// Sets specific server UUID. + static void set(UUID & uuid); + static void setRandomForUnitTests(); }; diff --git a/src/Core/UUID.h b/src/Core/UUID.h index 2bdefe9d3fc..1b8a075f0d2 100644 --- a/src/Core/UUID.h +++ b/src/Core/UUID.h @@ -64,6 +64,9 @@ namespace UUIDHelpers /// Generate random UUID. UUID generateV4(); + /// Generate UUID from hash of a string. + UUID makeUUIDv4FromHash(const String & string); + constexpr size_t HighBytes = (std::endian::native == std::endian::little) ? 0 : 1; constexpr size_t LowBytes = (std::endian::native == std::endian::little) ? 1 : 0; diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index d1b3b776370..6efbc429fd8 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -43,6 +43,8 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("enable_zstd_qat_codec", 1); context->setSetting("allow_create_index_without_type", 1); context->setSetting("allow_experimental_s3queue", 1); + + /// clickhouse-private settings context->setSetting("allow_experimental_shared_set_join", 1); } diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.cpp b/src/Dictionaries/ExecutablePoolDictionarySource.cpp index 403ce540e76..602fde0e0d7 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.cpp +++ b/src/Dictionaries/ExecutablePoolDictionarySource.cpp @@ -26,6 +26,9 @@ namespace DB namespace Setting { extern const SettingsSeconds max_execution_time; + + /// Cloud only + extern const SettingsBool cloud_mode; } namespace ErrorCodes @@ -33,6 +36,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int DICTIONARY_ACCESS_DENIED; extern const int UNSUPPORTED_METHOD; + extern const int SUPPORT_IS_DISABLED; } ExecutablePoolDictionarySource::ExecutablePoolDictionarySource( @@ -192,6 +196,9 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory) const std::string & /* default_database */, bool created_from_ddl) -> DictionarySourcePtr { + if (global_context->getSettingsRef()[Setting::cloud_mode]) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Dictionary source of type `executable pool` is disabled"); + if (dict_struct.has_expressions) throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `executable_pool` does not support attribute expressions"); diff --git a/src/Dictionaries/RedisDictionarySource.cpp b/src/Dictionaries/RedisDictionarySource.cpp index 17ed515ca9a..26d9ebae1b8 100644 --- a/src/Dictionaries/RedisDictionarySource.cpp +++ b/src/Dictionaries/RedisDictionarySource.cpp @@ -29,7 +29,6 @@ namespace DB ContextPtr global_context, const std::string & /* default_database */, bool /* created_from_ddl */) -> DictionarySourcePtr { - auto redis_config_prefix = config_prefix + ".redis"; auto host = config.getString(redis_config_prefix + ".host"); diff --git a/src/Dictionaries/XDBCDictionarySource.cpp b/src/Dictionaries/XDBCDictionarySource.cpp index ebb50f79497..4e64db5831d 100644 --- a/src/Dictionaries/XDBCDictionarySource.cpp +++ b/src/Dictionaries/XDBCDictionarySource.cpp @@ -28,6 +28,9 @@ namespace Setting { extern const SettingsSeconds http_receive_timeout; extern const SettingsBool odbc_bridge_use_connection_pooling; + + /// Cloud only + extern const SettingsBool cloud_mode; } namespace ErrorCodes @@ -242,6 +245,9 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory) ContextPtr global_context, const std::string & /* default_database */, bool /* check_config */) -> DictionarySourcePtr { + + if (global_context->getSettingsRef()[Setting::cloud_mode]) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Dictionary source of type `odbc` is disabled"); #if USE_ODBC BridgeHelperPtr bridge = std::make_shared>( global_context, diff --git a/src/Disks/DiskEncrypted.h b/src/Disks/DiskEncrypted.h index caba4184a73..95d9554b909 100644 --- a/src/Disks/DiskEncrypted.h +++ b/src/Disks/DiskEncrypted.h @@ -313,6 +313,8 @@ public: return std::make_shared(*this); } + /// Need to overwrite explicetly because this disk change + /// a lot of "delegate" methods. return createEncryptedTransaction(); } diff --git a/src/Disks/DiskEncryptedTransaction.cpp b/src/Disks/DiskEncryptedTransaction.cpp index 2660051e1d3..a528564fd1e 100644 --- a/src/Disks/DiskEncryptedTransaction.cpp +++ b/src/Disks/DiskEncryptedTransaction.cpp @@ -1,6 +1,5 @@ #include - #if USE_SSL #include #include diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h index 347e2c1cfe3..bf7ef3d30eb 100644 --- a/src/Disks/DiskType.h +++ b/src/Disks/DiskType.h @@ -27,9 +27,11 @@ enum class MetadataStorageType : uint8_t { None, Local, + Keeper, Plain, PlainRewritable, StaticWeb, + Memory, }; MetadataStorageType metadataTypeFromString(const String & type); diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 59f58a816e9..692020c86a6 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -497,7 +497,7 @@ public: protected: - friend class DiskDecorator; + friend class DiskReadOnlyWrapper; const String name; @@ -580,6 +580,7 @@ inline String directoryPath(const String & path) return fs::path(path).parent_path() / ""; } + } template <> diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 7055a7018ce..8e4ec6f3dfb 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -21,7 +21,7 @@ namespace ErrorCodes size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size) { /// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task. - if (!settings.enable_filesystem_cache) + if (!settings.enable_filesystem_cache && !settings.read_through_distributed_cache) return settings.remote_fs_buffer_size; /// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file. diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h index 4f45f5b7ddf..456b3a4778d 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h @@ -56,6 +56,8 @@ public: void deserialize(ReadBuffer & buf); void deserializeFromString(const std::string & data); + /// This method was deleted from public fork recently by Azat + void createFromSingleObject(ObjectStorageKey object_key, size_t bytes_size, size_t ref_count_, bool is_read_only_); void serialize(WriteBuffer & buf, bool sync) const; std::string serializeToString() const; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h index 5d56580a57b..922990bfdb7 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -71,6 +71,8 @@ public: DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock & lock) const; DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock & lock) const; + + bool isReadOnly() const override { return disk->isReadOnly(); } }; class MetadataStorageFromDiskTransaction final : public IMetadataTransaction, private MetadataOperationsHolder diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index cd36429d0a2..ece2608bea5 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include diff --git a/src/IO/ReadBufferFromPocoSocket.cpp b/src/IO/ReadBufferFromPocoSocket.cpp index bbf9f96404f..93562e7bfed 100644 --- a/src/IO/ReadBufferFromPocoSocket.cpp +++ b/src/IO/ReadBufferFromPocoSocket.cpp @@ -146,4 +146,9 @@ bool ReadBufferFromPocoSocketBase::poll(size_t timeout_microseconds) const return res; } +void ReadBufferFromPocoSocketBase::setReceiveTimeout(size_t receive_timeout_microseconds) +{ + socket.setReceiveTimeout(Poco::Timespan(receive_timeout_microseconds, 0)); +} + } diff --git a/src/IO/ReadBufferFromPocoSocket.h b/src/IO/ReadBufferFromPocoSocket.h index 912388adaac..2a0c0213302 100644 --- a/src/IO/ReadBufferFromPocoSocket.h +++ b/src/IO/ReadBufferFromPocoSocket.h @@ -34,6 +34,8 @@ public: ssize_t socketReceiveBytesImpl(char * ptr, size_t size); + void setReceiveTimeout(size_t receive_timeout_microseconds); + private: AsyncCallback async_callback; std::string socket_description; diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index af5e0339a9f..214927684b3 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -49,7 +49,6 @@ bool S3Exception::isRetryableError() const } } - namespace DB::ErrorCodes { extern const int S3_ERROR; diff --git a/src/Interpreters/ActionLocksManager.cpp b/src/Interpreters/ActionLocksManager.cpp index 28803a94c80..da6e9d473da 100644 --- a/src/Interpreters/ActionLocksManager.cpp +++ b/src/Interpreters/ActionLocksManager.cpp @@ -20,6 +20,8 @@ namespace ActionLocks extern const StorageActionBlockType PullReplicationLog = 8; extern const StorageActionBlockType Cleanup = 9; extern const StorageActionBlockType ViewRefresh = 10; + extern const StorageActionBlockType VirtualPartsUpdate = 11; + extern const StorageActionBlockType ReduceBlockingParts = 12; } diff --git a/src/Interpreters/BlobStorageLog.cpp b/src/Interpreters/BlobStorageLog.cpp index f20ac9165ac..601005626e1 100644 --- a/src/Interpreters/BlobStorageLog.cpp +++ b/src/Interpreters/BlobStorageLog.cpp @@ -96,6 +96,7 @@ void BlobStorageLog::prepareTable() std::unique_lock lock{prepare_mutex}; const auto & relative_data_path = merge_tree_table->getRelativeDataPath(); prefix_to_ignore = normalizePath(relative_data_path); + LOG_DEBUG(log, "Will ignore blobs with prefix {}", prefix_to_ignore); } } diff --git a/src/Interpreters/Cache/LRUFileCachePriority.h b/src/Interpreters/Cache/LRUFileCachePriority.h index 0ca62b19d37..58f64b6e28d 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.h +++ b/src/Interpreters/Cache/LRUFileCachePriority.h @@ -12,7 +12,7 @@ namespace DB /// Based on the LRU algorithm implementation, the record with the lowest priority is stored at /// the head of the queue, and the record with the highest priority is stored at the tail. -class LRUFileCachePriority final : public IFileCachePriority +class LRUFileCachePriority : public IFileCachePriority { protected: struct State @@ -85,6 +85,8 @@ public: bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) override; + FileCachePriorityPtr copy() const { return std::make_unique(max_size, max_elements, state); } + private: class LRUIterator; using LRUQueue = std::list; diff --git a/src/Interpreters/Cache/SLRUFileCachePriority.h b/src/Interpreters/Cache/SLRUFileCachePriority.h index 23bc8c0908b..5649a12aff9 100644 --- a/src/Interpreters/Cache/SLRUFileCachePriority.h +++ b/src/Interpreters/Cache/SLRUFileCachePriority.h @@ -72,7 +72,12 @@ public: bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) override; + FileCachePriorityPtr copy() const { return std::make_unique(max_size, max_elements, size_ratio, probationary_queue.state, protected_queue.state); } + private: + using LRUIterator = LRUFileCachePriority::LRUIterator; + using LRUQueue = std::list; + double size_ratio; LRUFileCachePriority protected_queue; LRUFileCachePriority probationary_queue; diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index 83a302f117d..308d1b33e8b 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -266,6 +266,10 @@ public: void triggerReloadDisksTask(const Strings & new_added_disks); + void stopReplicatedDDLQueries(); + void startReplicatedDDLQueries(); + bool canPerformReplicatedDDLQueries() const; + private: // The global instance of database catalog. unique_ptr is to allow // deferred initialization. Thought I'd use std::optional, but I can't @@ -361,6 +365,8 @@ private: std::mutex reload_disks_mutex; std::set disks_to_reload; static constexpr time_t DBMS_DEFAULT_DISK_RELOAD_PERIOD_SEC = 5; + + std::atomic replicated_ddl_queries_enabled = false; }; diff --git a/src/Interpreters/InterpreterSystemQuery.h b/src/Interpreters/InterpreterSystemQuery.h index 3d667fcaef0..82d55125927 100644 --- a/src/Interpreters/InterpreterSystemQuery.h +++ b/src/Interpreters/InterpreterSystemQuery.h @@ -82,6 +82,9 @@ private: AccessRightsElements getRequiredAccessForDDLOnCluster() const; void startStopAction(StorageActionBlockType action_type, bool start); + + void stopReplicatedDDLQueries(); + void startReplicatedDDLQueries(); }; diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 8601558b788..84f6746ec58 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -40,7 +40,6 @@ class MutationsInterpreter { private: struct Stage; - public: struct Settings { @@ -112,6 +111,10 @@ public: MutationKind::MutationKindEnum getMutationKind() const { return mutation_kind.mutation_kind; } + /// Returns a chain of actions that can be + /// applied to block to execute mutation commands. + std::vector getMutationActions() const; + /// Internal class which represents a data part for MergeTree /// or just storage for other storages. /// The main idea is to create a dedicated reading from MergeTree part. diff --git a/src/Interpreters/Session.h b/src/Interpreters/Session.h index ab4bc53b6f1..0a20dd896a9 100644 --- a/src/Interpreters/Session.h +++ b/src/Interpreters/Session.h @@ -98,7 +98,6 @@ public: /// Closes and removes session void closeSession(const String & session_id); - private: std::shared_ptr getSessionLog() const; ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const; diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 8122800f882..02d1ae528ac 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -19,6 +19,7 @@ Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_bloc , min_block_size_bytes(min_block_size_bytes_) , header(header_) { + LOG_TEST(getLogger("Squashing"), "header columns {}", header.columns()); } Chunk Squashing::flush() diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index d7d9da2a367..c5d58a873fb 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -14,6 +14,7 @@ #include #include #include +#include "Parsers/ASTSystemQuery.h" #include #include #include @@ -93,6 +94,12 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, if (!context->getSettingsRef()[Setting::allow_distributed_ddl]) throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Distributed DDL queries are prohibited for the user"); + bool is_system_query = dynamic_cast(query_ptr.get()) != nullptr; + bool replicated_ddl_queries_enabled = DatabaseCatalog::instance().canPerformReplicatedDDLQueries(); + + if (!is_system_query && !replicated_ddl_queries_enabled) + throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Replicated DDL queries are disabled"); + if (const auto * query_alter = query_ptr->as()) { for (const auto & command : query_alter->command_list->children) diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 8ea9fb12b86..83b7eb71d64 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -99,6 +99,7 @@ namespace DB MR_MACROS(COMPRESSION, "COMPRESSION") \ MR_MACROS(CONST, "CONST") \ MR_MACROS(CONSTRAINT, "CONSTRAINT") \ + MR_MACROS(CONNECTIONS, "CONNECTIONS") \ MR_MACROS(CREATE_POLICY, "CREATE POLICY") \ MR_MACROS(CREATE_PROFILE, "CREATE PROFILE") \ MR_MACROS(CREATE_QUOTA, "CREATE QUOTA") \ diff --git a/src/Parsers/IAST.cpp b/src/Parsers/IAST.cpp index 2b581f20e3b..0b1dff556f6 100644 --- a/src/Parsers/IAST.cpp +++ b/src/Parsers/IAST.cpp @@ -9,6 +9,7 @@ #include #include #include + #include namespace DB diff --git a/src/Server/CloudPlacementInfo.cpp b/src/Server/CloudPlacementInfo.cpp index d8810bb30de..08b4e2132ad 100644 --- a/src/Server/CloudPlacementInfo.cpp +++ b/src/Server/CloudPlacementInfo.cpp @@ -53,6 +53,9 @@ PlacementInfo & PlacementInfo::instance() void PlacementInfo::initialize(const Poco::Util::AbstractConfiguration & config) try { + if (initialized) + return; + if (!config.has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX)) { availability_zone = ""; diff --git a/src/Storages/MergeTree/FutureMergedMutatedPart.h b/src/Storages/MergeTree/FutureMergedMutatedPart.h index 09fb7b01678..ca607bb4e33 100644 --- a/src/Storages/MergeTree/FutureMergedMutatedPart.h +++ b/src/Storages/MergeTree/FutureMergedMutatedPart.h @@ -22,6 +22,7 @@ struct FutureMergedMutatedPart MergeTreeDataPartFormat part_format; MergeTreePartInfo part_info; MergeTreeData::DataPartsVector parts; + std::vector blocking_parts_to_remove; MergeType merge_type = MergeType::Regular; const MergeTreePartition & getPartition() const { return parts.front()->partition; } diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index d799ce57b40..c68617d3995 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -18,6 +18,7 @@ public: using ValueSizeMap = std::map; using VirtualFields = std::unordered_map; using DeserializeBinaryBulkStateMap = std::map; + using FileStreams = std::map>; IMergeTreeReader( MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_, diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index fa6640409e5..859d6f58f40 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -372,6 +372,8 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite { part = merge_task->getFuture().get(); + part->is_prewarmed = true; + storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr); /// Why we reset task here? Because it holds shared pointer to part and tryRemovePartImmediately will /// not able to remove the part and will throw an exception (because someone holds the pointer). diff --git a/src/Storages/MergeTree/MergeProjectionPartsTask.cpp b/src/Storages/MergeTree/MergeProjectionPartsTask.cpp index 4e1bb2f11a7..34cd925a8c6 100644 --- a/src/Storages/MergeTree/MergeProjectionPartsTask.cpp +++ b/src/Storages/MergeTree/MergeProjectionPartsTask.cpp @@ -83,6 +83,9 @@ bool MergeProjectionPartsTask::executeStep() ".tmp_proj"); next_level_parts.push_back(executeHere(tmp_part_merge_task)); + /// FIXME (alesapin) we should use some temporary storage for this, + /// not commit each subprojection part + next_level_parts.back()->getDataPartStorage().commitTransaction(); next_level_parts.back()->is_temp = true; } diff --git a/src/Storages/MergeTree/MergeTreeDataFormatVersion.h b/src/Storages/MergeTree/MergeTreeDataFormatVersion.h index 0a84f08ea71..a61938a993c 100644 --- a/src/Storages/MergeTree/MergeTreeDataFormatVersion.h +++ b/src/Storages/MergeTree/MergeTreeDataFormatVersion.h @@ -8,7 +8,7 @@ namespace DB STRONG_TYPEDEF(UInt32, MergeTreeDataFormatVersion) -const MergeTreeDataFormatVersion MERGE_TREE_DATA_OLD_FORMAT_VERSION {0}; -const MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING {1}; +static constexpr MergeTreeDataFormatVersion MERGE_TREE_DATA_OLD_FORMAT_VERSION {0}; +static constexpr MergeTreeDataFormatVersion MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING {1}; } diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 71fcb93f369..6d209b9f931 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -106,9 +106,11 @@ public: PreformattedMessage & out_disable_reason, bool dry_run = false); + /// Actually the most fresh partition with biggest modification_time String getBestPartitionToOptimizeEntire(const PartitionsInfo & partitions_info) const; /// Useful to quickly get a list of partitions that contain parts that we may want to merge + /// The result is limited by top_number_of_partitions_to_consider_for_merge PartitionIdsHint getPartitionsThatMayBeMerged( size_t max_total_size_to_merge, const AllowedMergingPredicate & can_merge_callback, diff --git a/src/Storages/MergeTree/MergeTreeDataPartType.h b/src/Storages/MergeTree/MergeTreeDataPartType.h index 8177809d41e..a59ccc2fab1 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartType.h +++ b/src/Storages/MergeTree/MergeTreeDataPartType.h @@ -45,6 +45,7 @@ public: enum Value { Full, + Packed, Unknown, }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 58a67fc4ba2..388737915ab 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -179,8 +179,8 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk( throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't take information about index granularity from blocks, when non empty index_granularity array specified"); - if (!getDataPartStorage().exists()) - getDataPartStorage().createDirectories(); + /// We don't need to check if it exists or not, createDirectories doesn't throw + getDataPartStorage().createDirectories(); if (settings.rewrite_primary_key) initPrimaryIndex(); diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 2af7abc17f9..9211ab51ad5 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -108,6 +108,14 @@ std::optional MergeTreeIndexGranularityInfo::getMarksTypeFromFilesyste return {}; } +MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo( + MarkType mark_type_, size_t index_granularity_, size_t index_granularity_bytes_) + : mark_type(mark_type_) + , fixed_index_granularity(index_granularity_) + , index_granularity_bytes(index_granularity_bytes_) +{ +} + MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_) : MergeTreeIndexGranularityInfo(storage, {storage.canUseAdaptiveGranularity(), (*storage.getSettings())[MergeTreeSetting::compress_marks], type_.getValue()}) { diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h index 87445c99ade..b302d6b1a4b 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.h @@ -49,6 +49,7 @@ public: MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MarkType mark_type_); MergeTreeIndexGranularityInfo(MergeTreeDataPartType type_, bool is_adaptive_, size_t index_granularity_, size_t index_granularity_bytes_); + MergeTreeIndexGranularityInfo(MarkType mark_type_, size_t index_granularity_, size_t index_granularity_bytes_); void changeGranularityIfRequired(const IDataPartStorage & data_part_storage); diff --git a/src/Storages/MergeTree/MergeTreeMutationStatus.cpp b/src/Storages/MergeTree/MergeTreeMutationStatus.cpp index 6553054774e..e0214d6a79d 100644 --- a/src/Storages/MergeTree/MergeTreeMutationStatus.cpp +++ b/src/Storages/MergeTree/MergeTreeMutationStatus.cpp @@ -26,11 +26,11 @@ void checkMutationStatus(std::optional & status, const throw Exception( ErrorCodes::UNFINISHED, "Exception happened during execution of mutation{} '{}' with part '{}' reason: '{}'. This error maybe retryable or not. " - "In case of unretryable error, mutation can be killed with KILL MUTATION query", + "In case of unretryable error, mutation can be killed with KILL MUTATION query \n\n{}\n", mutation_ids.size() > 1 ? "s" : "", boost::algorithm::join(mutation_ids, ", "), status->latest_failed_part, - status->latest_fail_reason); + status->latest_fail_reason, StackTrace().toString()); } } diff --git a/src/Storages/MergeTree/MergeTreePartInfo.h b/src/Storages/MergeTree/MergeTreePartInfo.h index f128722b03b..28b043fcf20 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/src/Storages/MergeTree/MergeTreePartInfo.h @@ -46,6 +46,13 @@ struct MergeTreePartInfo < std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation); } + bool operator>(const MergeTreePartInfo & rhs) const + { + return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation) + > std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation); + } + + bool operator==(const MergeTreePartInfo & rhs) const { return !(*this != rhs); diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index 7acc8cd88b4..13ce14e02ec 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -35,7 +35,7 @@ struct PrewhereExprStep bool remove_filter_column = false; bool need_filter = false; - /// Some PREWHERE steps should be executed without conversions. + /// Some PREWHERE steps should be executed without conversions (e.g. early mutation steps) /// A step without alter conversion cannot be executed after step with alter conversions. bool perform_alter_conversions = false; }; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 95469337f8a..4de52213869 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -3,8 +3,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -690,6 +690,8 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: /// /// metadata_snapshot->check(part->getColumns()); + part->is_prewarmed = true; + auto block_id_path = getBlockIdPath(storage.zookeeper_path, block_id); CommitRetryContext retry_context; diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index 2a1ddf32431..34e699bcef7 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -135,7 +135,6 @@ bool isRetryableException(std::exception_ptr exception_ptr) } } - static IMergeTreeDataPart::Checksums checkDataPart( MergeTreeData::DataPartPtr data_part, const IDataPartStorage & data_part_storage, @@ -422,6 +421,7 @@ IMergeTreeDataPart::Checksums checkDataPart( } ReadSettings read_settings; + read_settings.read_through_distributed_cache = false; read_settings.enable_filesystem_cache = false; read_settings.enable_filesystem_cache_log = false; read_settings.enable_filesystem_read_prefetches_log = false; diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index f08765367fa..2e7433dc7b8 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -5,6 +5,7 @@ #if USE_AWS_S3 #include #include +#include namespace DB { diff --git a/src/Storages/StorageGenerateRandom.cpp b/src/Storages/StorageGenerateRandom.cpp index 23ee7a18b53..23df9bfa1c7 100644 --- a/src/Storages/StorageGenerateRandom.cpp +++ b/src/Storages/StorageGenerateRandom.cpp @@ -150,6 +150,7 @@ size_t estimateValueSize( } } +} ColumnPtr fillColumnWithRandomData( const DataTypePtr type, @@ -539,6 +540,8 @@ ColumnPtr fillColumnWithRandomData( } } +namespace +{ class GenerateSource : public ISource { diff --git a/src/Storages/TableZnodeInfo.h b/src/Storages/TableZnodeInfo.h index 729a88e7509..4e3ffb44056 100644 --- a/src/Storages/TableZnodeInfo.h +++ b/src/Storages/TableZnodeInfo.h @@ -17,6 +17,8 @@ struct StorageID; class ASTCreateQuery; class Context; using ContextPtr = std::shared_ptr; +class IDatabase; +using DatabasePtr = std::shared_ptr; /// Helper for replicated tables that use zookeeper for coordination among replicas. /// Handles things like: From 4b5ddd2d81d83f5c2e898da2bd4d49ec626ec04a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 25 Oct 2024 21:35:32 +0200 Subject: [PATCH 2/4] Add various trash --- src/Databases/DatabaseReplicated.cpp | 12 +++++++++--- .../ObjectStorages/MetadataStorageFromDisk.h | 2 -- src/Interpreters/DatabaseCatalog.cpp | 15 +++++++++++++++ src/Interpreters/MutationsInterpreter.h | 4 ---- src/Storages/MergeTree/MergeFromLogEntryTask.cpp | 2 -- .../MergeTree/ReplicatedMergeTreeSink.cpp | 2 -- 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index d4eaaf750cd..387667b1b42 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -85,6 +85,7 @@ namespace ErrorCodes extern const int NO_ACTIVE_REPLICAS; extern const int CANNOT_GET_REPLICATED_DATABASE_SNAPSHOT; extern const int CANNOT_RESTORE_TABLE; + extern const int QUERY_IS_PROHIBITED; extern const int SUPPORT_IS_DISABLED; } @@ -1057,6 +1058,9 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex { waitDatabaseStarted(); + if (!DatabaseCatalog::instance().canPerformReplicatedDDLQueries()) + throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Replicated DDL queries are disabled"); + if (query_context->getCurrentTransaction() && query_context->getSettingsRef()[Setting::throw_on_unsupported_query_inside_transaction]) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Distributed DDL queries inside transactions are not supported"); @@ -1237,14 +1241,16 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep String query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Ordinary", backQuoteIfNeed(to_db_name)); auto query_context = Context::createCopy(getContext()); query_context->setSetting("allow_deprecated_database_ordinary", 1); - executeQuery(query, query_context, QueryFlags{.internal = true}); + query_context->setSetting("cloud_mode", false); + executeQuery(query, query_context, QueryFlags{ .internal = true }); /// But we want to avoid discarding UUID of ReplicatedMergeTree tables, because it will not work /// if zookeeper_path contains {uuid} macro. Replicated database do not recreate replicated tables on recovery, /// so it's ok to save UUID of replicated table. query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Atomic", backQuoteIfNeed(to_db_name_replicated)); query_context = Context::createCopy(getContext()); - executeQuery(query, query_context, QueryFlags{.internal = true}); + query_context->setSetting("cloud_mode", false); + executeQuery(query, query_context, QueryFlags{ .internal = true }); } size_t moved_tables = 0; @@ -1634,7 +1640,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl auto table = tryGetTable(table_name, getContext()); if (!table) throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} doesn't exist", table_name); - if (table->getName() == "MaterializedView" || table->getName() == "WindowView") + if (table->getName() == "MaterializedView" || table->getName() == "WindowView" || table->getName() == "SharedSet" || table->getName() == "SharedJoin") { /// Avoid recursive locking of metadata_mutex table->dropInnerTableIfAny(sync, local_context); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h index 922990bfdb7..5d56580a57b 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -71,8 +71,6 @@ public: DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock & lock) const; DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock & lock) const; - - bool isReadOnly() const override { return disk->isReadOnly(); } }; class MetadataStorageFromDiskTransaction final : public IMetadataTransaction, private MetadataOperationsHolder diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index c92602105c5..dc9ce23ddb9 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -1817,6 +1817,21 @@ void DatabaseCatalog::triggerReloadDisksTask(const Strings & new_added_disks) (*reload_disks_task)->schedule(); } +void DatabaseCatalog::stopReplicatedDDLQueries() +{ + replicated_ddl_queries_enabled = false; +} + +void DatabaseCatalog::startReplicatedDDLQueries() +{ + replicated_ddl_queries_enabled = true; +} + +bool DatabaseCatalog::canPerformReplicatedDDLQueries() const +{ + return replicated_ddl_queries_enabled; +} + static void maybeUnlockUUID(UUID uuid) { if (uuid == UUIDHelpers::Nil) diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 84f6746ec58..901cd13cd2f 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -111,10 +111,6 @@ public: MutationKind::MutationKindEnum getMutationKind() const { return mutation_kind.mutation_kind; } - /// Returns a chain of actions that can be - /// applied to block to execute mutation commands. - std::vector getMutationActions() const; - /// Internal class which represents a data part for MergeTree /// or just storage for other storages. /// The main idea is to create a dedicated reading from MergeTree part. diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index 859d6f58f40..fa6640409e5 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -372,8 +372,6 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite { part = merge_task->getFuture().get(); - part->is_prewarmed = true; - storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr); /// Why we reset task here? Because it holds shared pointer to part and tryRemovePartImmediately will /// not able to remove the part and will throw an exception (because someone holds the pointer). diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 4de52213869..1ba04fc460d 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -690,8 +690,6 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: /// /// metadata_snapshot->check(part->getColumns()); - part->is_prewarmed = true; - auto block_id_path = getBlockIdPath(storage.zookeeper_path, block_id); CommitRetryContext retry_context; From 9bbebadef4ee3d8599946cafe2f74e1a473763fc Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 25 Oct 2024 23:03:32 +0200 Subject: [PATCH 3/4] Fixup --- programs/server/Server.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 35dae614d87..c106a68f360 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -2267,6 +2267,21 @@ try throw; } + bool found_stop_flag = false; + + if (has_zookeeper && global_context->getMacros()->getMacroMap().contains("replica")) + { + auto zookeeper = global_context->getZooKeeper(); + String stop_flag_path = "/clickhouse/stop_replicated_ddl_queries/{replica}"; + stop_flag_path = global_context->getMacros()->expand(stop_flag_path); + found_stop_flag = zookeeper->exists(stop_flag_path); + } + + if (found_stop_flag) + LOG_INFO(log, "Found a stop flag for replicated DDL queries. They will be disabled"); + else + DatabaseCatalog::instance().startReplicatedDDLQueries(); + LOG_DEBUG(log, "Loaded metadata."); if (has_trace_collector) From 8f64803c7e880837d2765319aa3539dd5b799726 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 26 Oct 2024 02:10:16 +0200 Subject: [PATCH 4/4] Sync the test --- tests/queries/0_stateless/01271_show_privileges.reference | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 17554f5c8a5..0e839ac6fc1 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -142,6 +142,7 @@ SYSTEM REPLICATED SENDS ['SYSTEM STOP REPLICATED SENDS','SYSTEM START REPLICATED SYSTEM SENDS ['SYSTEM STOP SENDS','SYSTEM START SENDS','STOP SENDS','START SENDS'] \N SYSTEM SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLICATION QUEUES','STOP REPLICATION QUEUES','START REPLICATION QUEUES'] TABLE SYSTEM SYSTEM VIRTUAL PARTS UPDATE ['SYSTEM STOP VIRTUAL PARTS UPDATE','SYSTEM START VIRTUAL PARTS UPDATE','STOP VIRTUAL PARTS UPDATE','START VIRTUAL PARTS UPDATE'] TABLE SYSTEM +SYSTEM REDUCE BLOCKING PARTS ['SYSTEM STOP REDUCE BLOCKING PARTS','SYSTEM START REDUCE BLOCKING PARTS','STOP REDUCE BLOCKING PARTS','START REDUCE BLOCKING PARTS'] TABLE SYSTEM SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM SYSTEM REPLICA READINESS ['SYSTEM REPLICA READY','SYSTEM REPLICA UNREADY'] GLOBAL SYSTEM