Add fsync support for Distributed engine.

Two new settings (by analogy with MergeTree family) has been added:

- `fsync_after_insert` - Do fsync for every inserted. Will decreases
  performance of inserts.

- `fsync_tmp_directory` - Do fsync for temporary directory (that is used
  for async INSERT only) after all part operations (writes, renames,
  etc.).

Refs: #17380 (p1)
This commit is contained in:
Azat Khuzhin 2021-01-07 17:14:41 +03:00
parent b2577327f1
commit b5ace27014
11 changed files with 160 additions and 6 deletions

View File

@ -25,10 +25,19 @@ The Distributed engine accepts parameters:
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
Also it accept the following settings:
- `fsync_after_insert` - Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. `insert_distributed_sync=false`),
- `fsync_tmp_directory` - Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).
Example:
``` sql
Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
fsync_after_insert=0,
fsync_tmp_directory=0;
```
Data will be read from all servers in the `logs` cluster, from the default.hits table located on every server in the cluster.

View File

@ -29,6 +29,7 @@
#include <Common/escapeForFileName.h>
#include <Common/CurrentThread.h>
#include <Common/createHardLink.h>
#include <Common/DirectorySyncGuard.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <ext/scope_guard.h>
@ -588,6 +589,10 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
const auto & settings = context.getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
bool fsync = distributed_settings.fsync_after_insert;
bool dir_fsync = distributed_settings.fsync_tmp_directory;
std::string compression_method = Poco::toUpper(settings.network_compression_method.toString());
std::optional<int> compression_level;
@ -603,14 +608,15 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
std::string first_file_tmp_path{};
auto reservation = storage.getStoragePolicy()->reserveAndCheck(block.bytes());
auto disk = reservation->getDisk()->getPath();
const auto disk = reservation->getDisk();
auto disk_path = disk->getPath();
auto data_path = storage.getRelativeDataPath();
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
{
const std::string path(disk + data_path + *it);
const std::string path(disk_path + data_path + *it);
Poco::File(path).createDirectory();
const std::string tmp_path(path + "/tmp/");
@ -622,6 +628,13 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// Write batch to temporary location
{
std::optional<DirectorySyncGuard> tmp_path_sync_guard;
if (dir_fsync)
{
const std::string relative_tmp_path(data_path + *it + "/tmp/");
tmp_path_sync_guard.emplace(disk, relative_tmp_path);
}
WriteBufferFromFile out{first_file_tmp_path};
CompressedWriteBuffer compress{out, compression_codec};
NativeBlockOutputStream stream{compress, DBMS_TCP_PROTOCOL_VERSION, block.cloneEmpty()};
@ -647,6 +660,10 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
out.finalize();
if (fsync)
out.sync();
}
// Create hardlink here to reuse increment number
@ -658,10 +675,10 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// Make hardlinks
for (; it != dir_names.end(); ++it)
{
const std::string path(disk + data_path + *it);
const std::string path(disk_path + data_path + *it);
Poco::File(path).createDirectory();
const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin");
const std::string block_file_path(path + '/' + toString(storage.file_names_increment.get()) + ".bin");
createHardLink(first_file_tmp_path, block_file_path);
}
@ -673,7 +690,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
auto & directory_monitor = storage.requireDirectoryMonitor(disk_path, dir_name);
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
}
}

View File

@ -0,0 +1,42 @@
#include <Storages/Distributed/DistributedSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
void DistributedSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Core/Defines.h>
#include <Core/BaseSettings.h>
namespace Poco::Util
{
class AbstractConfiguration;
}
namespace DB
{
class ASTStorage;
#define LIST_OF_DISTRIBUTED_SETTINGS(M) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for async INSERT, i.e. insert_distributed_sync=false)", 0) \
M(Bool, fsync_tmp_directory, false, "Do fsync for temporary directory (that is used for async INSERT only) after all part operations (writes, renames, etc.).", 0) \
DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
/** Settings for the Distributed family of engines.
*/
struct DistributedSettings : public BaseSettings<DistributedSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -363,6 +363,7 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_)
: IStorage(id_)
@ -374,6 +375,7 @@ StorageDistributed::StorageDistributed(
, cluster_name(global_context.getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
, distributed_settings(distributed_settings_)
, rng(randomSeed())
{
StorageInMemoryMetadata storage_metadata;
@ -417,9 +419,10 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach, std::move(owned_cluster_))
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, distributed_settings_, attach, std::move(owned_cluster_))
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
@ -954,6 +957,8 @@ void registerStorageDistributed(StorageFactory & factory)
* - constant expression with string result, like currentDatabase();
* -- string literal as specific case;
* - empty string means 'use default database from cluster'.
*
* Distributed engine also supports SETTINGS clause.
*/
ASTs & engine_args = args.engine_args;
@ -995,6 +1000,13 @@ void registerStorageDistributed(StorageFactory & factory)
", but should be one of integer type", ErrorCodes::TYPE_MISMATCH);
}
/// TODO: move some arguments from the arguments to the SETTINGS.
DistributedSettings distributed_settings;
if (args.storage_def->settings)
{
distributed_settings.loadFromQuery(*args.storage_def);
}
return StorageDistributed::create(
args.table_id, args.columns, args.constraints,
remote_database, remote_table, cluster_name,
@ -1002,9 +1014,11 @@ void registerStorageDistributed(StorageFactory & factory)
sharding_key,
storage_policy,
args.relative_data_path,
distributed_settings,
args.attach);
},
{
.supports_settings = true,
.supports_parallel_insert = true,
.source_access_type = AccessType::REMOTE,
});

View File

@ -4,6 +4,7 @@
#include <Storages/IStorage.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Common/SimpleIncrement.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
@ -127,6 +128,8 @@ public:
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
@ -162,6 +165,7 @@ protected:
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_ = {});
@ -175,6 +179,7 @@ protected:
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_ = {});
@ -188,6 +193,8 @@ protected:
/// Other volumes will be ignored. It's needed to allow using the same multi-volume policy both for Distributed and other engines.
VolumePtr data_volume;
DistributedSettings distributed_settings;
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;

View File

@ -17,6 +17,7 @@ SRCS(
ConstraintsDescription.cpp
Distributed/DirectoryMonitor.cpp
Distributed/DistributedBlockOutputStream.cpp
Distributed/DistributedSettings.cpp
IStorage.cpp
IndicesDescription.cpp
JoinSettings.cpp

View File

@ -211,6 +211,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con
ASTPtr{},
String{},
String{},
DistributedSettings{},
false,
cluster)
: StorageDistributed::create(
@ -224,6 +225,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con
ASTPtr{},
String{},
String{},
DistributedSettings{},
false,
cluster);

View File

@ -0,0 +1,6 @@
no fsync
0
90
fsync
90
180

View File

@ -0,0 +1,24 @@
drop table if exists dist_01643;
drop table if exists data_01643;
create table data_01643 (key Int) engine=Memory();
select 'no fsync';
create table dist_01643 as data_01643 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01643, key);
system stop distributed sends dist_01643;
insert into dist_01643 select * from numbers(10) settings prefer_localhost_replica=0;
select sum(*) from dist_01643;
system flush distributed dist_01643;
select sum(*) from dist_01643;
drop table dist_01643;
select 'fsync';
create table dist_01643 as data_01643 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01643, key) settings fsync_after_insert=1, fsync_tmp_directory=1;
system stop distributed sends dist_01643;
insert into dist_01643 select * from numbers(10) settings prefer_localhost_replica=0;
select sum(*) from dist_01643;
system flush distributed dist_01643;
select sum(*) from dist_01643;
drop table dist_01643;
drop table if exists data_01643;

View File

@ -182,3 +182,4 @@
01601_custom_tld
01636_nullable_fuzz2
01639_distributed_sync_insert_zero_rows
01644_distributed_async_insert_fsync_smoke