From b5ace2701442776bddcf7b6196860e3f7b797d0b Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 7 Jan 2021 17:14:41 +0300 Subject: [PATCH] Add fsync support for Distributed engine. Two new settings (by analogy with MergeTree family) has been added: - `fsync_after_insert` - Do fsync for every inserted. Will decreases performance of inserts. - `fsync_tmp_directory` - Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.). Refs: #17380 (p1) --- .../table-engines/special/distributed.md | 9 ++++ .../DistributedBlockOutputStream.cpp | 27 +++++++++--- .../Distributed/DistributedSettings.cpp | 42 +++++++++++++++++++ .../Distributed/DistributedSettings.h | 31 ++++++++++++++ src/Storages/StorageDistributed.cpp | 16 ++++++- src/Storages/StorageDistributed.h | 7 ++++ src/Storages/ya.make | 1 + src/TableFunctions/TableFunctionRemote.cpp | 2 + ...ributed_async_insert_fsync_smoke.reference | 6 +++ ...4_distributed_async_insert_fsync_smoke.sql | 24 +++++++++++ .../queries/0_stateless/arcadia_skip_list.txt | 1 + 11 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 src/Storages/Distributed/DistributedSettings.cpp create mode 100644 src/Storages/Distributed/DistributedSettings.h create mode 100644 tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.reference create mode 100644 tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.sql diff --git a/docs/en/engines/table-engines/special/distributed.md b/docs/en/engines/table-engines/special/distributed.md index 9f96ca3fe8c..f8194ced23f 100644 --- a/docs/en/engines/table-engines/special/distributed.md +++ b/docs/en/engines/table-engines/special/distributed.md @@ -25,10 +25,19 @@ The Distributed engine accepts parameters: - [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting - [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) for the examples +Also it accept the following settings: + +- `fsync_after_insert` - Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. `insert_distributed_sync=false`), + +- `fsync_tmp_directory` - Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.). + Example: ``` sql Distributed(logs, default, hits[, sharding_key[, policy_name]]) +SETTINGS + fsync_after_insert=0, + fsync_tmp_directory=0; ``` Data will be read from all servers in the `logs` cluster, from the default.hits table located on every server in the cluster. diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 8d901028057..6c6399bf6d5 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -588,6 +589,10 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector & dir_names) { const auto & settings = context.getSettingsRef(); + const auto & distributed_settings = storage.getDistributedSettingsRef(); + + bool fsync = distributed_settings.fsync_after_insert; + bool dir_fsync = distributed_settings.fsync_tmp_directory; std::string compression_method = Poco::toUpper(settings.network_compression_method.toString()); std::optional compression_level; @@ -603,14 +608,15 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: std::string first_file_tmp_path{}; auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes()); - auto disk = reservation->getDisk()->getPath(); + const auto disk = reservation->getDisk(); + auto disk_path = disk->getPath(); auto data_path = storage.getRelativeDataPath(); auto it = dir_names.begin(); /// on first iteration write block to a temporary directory for subsequent /// hardlinking to ensure the inode is not freed until we're done { - const std::string path(disk + data_path + *it); + const std::string path(disk_path + data_path + *it); Poco::File(path).createDirectory(); const std::string tmp_path(path + "/tmp/"); @@ -622,6 +628,13 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: /// Write batch to temporary location { + std::optional tmp_path_sync_guard; + if (dir_fsync) + { + const std::string relative_tmp_path(data_path + *it + "/tmp/"); + tmp_path_sync_guard.emplace(disk, relative_tmp_path); + } + WriteBufferFromFile out{first_file_tmp_path}; CompressedWriteBuffer compress{out, compression_codec}; NativeBlockOutputStream stream{compress, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()}; @@ -647,6 +660,10 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: stream.writePrefix(); stream.write(block); stream.writeSuffix(); + + out.finalize(); + if (fsync) + out.sync(); } // Create hardlink here to reuse increment number @@ -658,10 +675,10 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: /// Make hardlinks for (; it != dir_names.end(); ++it) { - const std::string path(disk + data_path + *it); + const std::string path(disk_path + data_path + *it); Poco::File(path).createDirectory(); - const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin"); + const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin"); createHardLink(first_file_tmp_path, block_file_path); } @@ -673,7 +690,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms; for (const auto & dir_name : dir_names) { - auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name); + auto & directory_monitor = storage.requireDirectoryMonitor(disk_path, dir_name); directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds()); } } diff --git a/src/Storages/Distributed/DistributedSettings.cpp b/src/Storages/Distributed/DistributedSettings.cpp new file mode 100644 index 00000000000..555aeba7c58 --- /dev/null +++ b/src/Storages/Distributed/DistributedSettings.cpp @@ -0,0 +1,42 @@ +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +IMPLEMENT_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) + +void DistributedSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for storage " + storage_def.engine->name); + throw; + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } +} + +} + diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h new file mode 100644 index 00000000000..9d79feaa05a --- /dev/null +++ b/src/Storages/Distributed/DistributedSettings.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + + +namespace Poco::Util +{ + class AbstractConfiguration; +} + + +namespace DB +{ +class ASTStorage; + +#define LIST_OF_DISTRIBUTED_SETTINGS(M) \ + M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \ + M(Bool, fsync_tmp_directory, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \ + +DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS) + + +/** Settings for the Distributed family of engines. + */ +struct DistributedSettings : public BaseSettings +{ + void loadFromQuery(ASTStorage & storage_def); +}; + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index dd99d0f0f27..af3c410872f 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -363,6 +363,7 @@ StorageDistributed::StorageDistributed( const ASTPtr & sharding_key_, const String & storage_policy_name_, const String & relative_data_path_, + const DistributedSettings & distributed_settings_, bool attach_, ClusterPtr owned_cluster_) : IStorage(id_) @@ -374,6 +375,7 @@ StorageDistributed::StorageDistributed( , cluster_name(global_context.getMacros()->expand(cluster_name_)) , has_sharding_key(sharding_key_) , relative_data_path(relative_data_path_) + , distributed_settings(distributed_settings_) , rng(randomSeed()) { StorageInMemoryMetadata storage_metadata; @@ -417,9 +419,10 @@ StorageDistributed::StorageDistributed( const ASTPtr & sharding_key_, const String & storage_policy_name_, const String & relative_data_path_, + const DistributedSettings & distributed_settings_, bool attach, ClusterPtr owned_cluster_) - : StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach, std::move(owned_cluster_)) + : StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, distributed_settings_, attach, std::move(owned_cluster_)) { remote_table_function_ptr = std::move(remote_table_function_ptr_); } @@ -954,6 +957,8 @@ void registerStorageDistributed(StorageFactory & factory) * - constant expression with string result, like currentDatabase(); * -- string literal as specific case; * - empty string means 'use default database from cluster'. + * + * Distributed engine also supports SETTINGS clause. */ ASTs & engine_args = args.engine_args; @@ -995,6 +1000,13 @@ void registerStorageDistributed(StorageFactory & factory) ", but should be one of integer type", ErrorCodes::TYPE_MISMATCH); } + /// TODO: move some arguments from the arguments to the SETTINGS. + DistributedSettings distributed_settings; + if (args.storage_def->settings) + { + distributed_settings.loadFromQuery(*args.storage_def); + } + return StorageDistributed::create( args.table_id, args.columns, args.constraints, remote_database, remote_table, cluster_name, @@ -1002,9 +1014,11 @@ void registerStorageDistributed(StorageFactory & factory) sharding_key, storage_policy, args.relative_data_path, + distributed_settings, args.attach); }, { + .supports_settings = true, .supports_parallel_insert = true, .source_access_type = AccessType::REMOTE, }); diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index ce7e48c85a9..083407666f6 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -127,6 +128,8 @@ public: size_t getRandomShardIndex(const Cluster::ShardsInfo & shards); + const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; } + String remote_database; String remote_table; ASTPtr remote_table_function_ptr; @@ -162,6 +165,7 @@ protected: const ASTPtr & sharding_key_, const String & storage_policy_name_, const String & relative_data_path_, + const DistributedSettings & distributed_settings_, bool attach_, ClusterPtr owned_cluster_ = {}); @@ -175,6 +179,7 @@ protected: const ASTPtr & sharding_key_, const String & storage_policy_name_, const String & relative_data_path_, + const DistributedSettings & distributed_settings_, bool attach, ClusterPtr owned_cluster_ = {}); @@ -188,6 +193,8 @@ protected: /// Other volumes will be ignored. It's needed to allow using the same multi-volume policy both for Distributed and other engines. VolumePtr data_volume; + DistributedSettings distributed_settings; + struct ClusterNodeData { std::unique_ptr directory_monitor; diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 27aa9e3ac3f..9169ff54b87 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -17,6 +17,7 @@ SRCS( ConstraintsDescription.cpp Distributed/DirectoryMonitor.cpp Distributed/DistributedBlockOutputStream.cpp + Distributed/DistributedSettings.cpp IStorage.cpp IndicesDescription.cpp JoinSettings.cpp diff --git a/src/TableFunctions/TableFunctionRemote.cpp b/src/TableFunctions/TableFunctionRemote.cpp index a031490b88b..914b7083fca 100644 --- a/src/TableFunctions/TableFunctionRemote.cpp +++ b/src/TableFunctions/TableFunctionRemote.cpp @@ -211,6 +211,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con ASTPtr{}, String{}, String{}, + DistributedSettings{}, false, cluster) : StorageDistributed::create( @@ -224,6 +225,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con ASTPtr{}, String{}, String{}, + DistributedSettings{}, false, cluster); diff --git a/tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.reference b/tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.reference new file mode 100644 index 00000000000..fc919524275 --- /dev/null +++ b/tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.reference @@ -0,0 +1,6 @@ +no fsync +0 +90 +fsync +90 +180 diff --git a/tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.sql b/tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.sql new file mode 100644 index 00000000000..f01bddbcc4f --- /dev/null +++ b/tests/queries/0_stateless/01644_distributed_async_insert_fsync_smoke.sql @@ -0,0 +1,24 @@ +drop table if exists dist_01643; +drop table if exists data_01643; + +create table data_01643 (key Int) engine=Memory(); + +select 'no fsync'; +create table dist_01643 as data_01643 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01643, key); +system stop distributed sends dist_01643; +insert into dist_01643 select * from numbers(10) settings prefer_localhost_replica=0; +select sum(*) from dist_01643; +system flush distributed dist_01643; +select sum(*) from dist_01643; +drop table dist_01643; + +select 'fsync'; +create table dist_01643 as data_01643 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01643, key) settings fsync_after_insert=1, fsync_tmp_directory=1; +system stop distributed sends dist_01643; +insert into dist_01643 select * from numbers(10) settings prefer_localhost_replica=0; +select sum(*) from dist_01643; +system flush distributed dist_01643; +select sum(*) from dist_01643; +drop table dist_01643; + +drop table if exists data_01643; diff --git a/tests/queries/0_stateless/arcadia_skip_list.txt b/tests/queries/0_stateless/arcadia_skip_list.txt index 6c6636b923d..a4507555373 100644 --- a/tests/queries/0_stateless/arcadia_skip_list.txt +++ b/tests/queries/0_stateless/arcadia_skip_list.txt @@ -182,3 +182,4 @@ 01601_custom_tld 01636_nullable_fuzz2 01639_distributed_sync_insert_zero_rows +01644_distributed_async_insert_fsync_smoke