Merge pull request #8756 from azat/distributed_storage_configuration

Multiple disks/volumes for storing data for send in Distributed engine
This commit is contained in:
alexey-milovidov 2020-01-26 02:19:28 +03:00 committed by GitHub
commit 2df93a6d21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 247 additions and 90 deletions

View File

@ -80,12 +80,11 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool(std::move(pool_))
, name(std::move(name_))
, path{storage.path + name + '/'}
, path{path_ + '/'}
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
@ -692,10 +691,10 @@ std::string StorageDistributedDirectoryMonitor::getLoggerName() const
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor";
}
void StorageDistributedDirectoryMonitor::updatePath()
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path)
{
std::lock_guard lock{mutex};
path = storage.path + name + '/';
path = new_path;
current_batch_file_path = path + "current_batch.txt";
}

View File

@ -20,13 +20,13 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
~StorageDistributedDirectoryMonitor();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
void updatePath();
void updatePath(const std::string & new_path);
void flushAllData();
@ -47,7 +47,6 @@ private:
StorageDistributed & storage;
const ConnectionPoolPtr pool;
const std::string name;
std::string path;
const bool should_batch_inserts = false;

View File

@ -1,6 +1,7 @@
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
@ -563,11 +564,12 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';
const auto & [disk, data_path] = storage.getPath();
const std::string path(disk + data_path + dir_name + '/');
/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
storage.requireDirectoryMonitor(disk, dir_name);
const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;

View File

@ -3,6 +3,8 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/DiskLocal.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypesNumber.h>
@ -153,12 +155,6 @@ UInt64 getMaximumFileNumber(const std::string & dir_path)
return res;
}
void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & increment)
{
if (!path.empty())
increment.set(getMaximumFileNumber(path));
}
/// the same as DistributedBlockOutputStream::createSelector, should it be static?
IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
{
@ -211,6 +207,7 @@ static ExpressionActionsPtr buildShardingKeyExpression(const ASTPtr & sharding_k
return ExpressionAnalyzer(query, syntax_result, context).getActions(project);
}
StorageDistributed::StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
@ -220,6 +217,7 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & relative_data_path_,
bool attach_)
: IStorage(id_,
@ -233,7 +231,8 @@ StorageDistributed::StorageDistributed(
, global_context(context_)
, cluster_name(global_context.getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, path(relative_data_path_.empty() ? "" : (context_.getPath() + relative_data_path_))
, storage_policy(storage_policy_)
, relative_data_path(relative_data_path_)
{
setColumns(columns_);
setConstraints(constraints_);
@ -244,6 +243,9 @@ StorageDistributed::StorageDistributed(
sharding_key_column_name = sharding_key_->getColumnName();
}
if (!relative_data_path.empty())
createStorage();
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
{
@ -262,13 +264,34 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & relative_data_path_,
bool attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, relative_data_path_, attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_, relative_data_path_, attach)
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
void StorageDistributed::createStorage()
{
/// Create default policy with the relative_data_path_
if (storage_policy.empty())
{
std::string path(global_context.getPath());
/// Disk must ends with '/'
if (!path.ends_with('/'))
path += '/';
auto disk = std::make_shared<DiskLocal>("default", path, 0);
volume = std::make_shared<Volume>("default", std::vector<DiskPtr>{disk}, 0);
}
else
{
auto policy = global_context.getStoragePolicySelector()[storage_policy];
if (policy->getVolumes().size() != 1)
throw Exception("Policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
volume = policy->getVolume(0);
}
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
@ -278,7 +301,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
ClusterPtr owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = std::move(owned_cluster_);
return res;
}
@ -291,7 +314,7 @@ StoragePtr StorageDistributed::createWithOwnCluster(
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = owned_cluster_;
return res;
}
@ -400,7 +423,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
const auto & settings = context.getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (path.empty() && !owned_cluster && !settings.insert_distributed_sync)
if (!volume && !owned_cluster && !settings.insert_distributed_sync)
{
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);
@ -453,8 +476,19 @@ void StorageDistributed::alter(const AlterCommands & params, const Context & con
void StorageDistributed::startup()
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
if (!volume)
return;
for (const DiskPtr & disk : volume->disks)
createDirectoryMonitors(disk->getPath());
for (const String & path : getDataPaths())
{
UInt64 inc = getMaximumFileNumber(path);
if (inc > file_names_increment.value)
file_names_increment.value.store(inc);
}
LOG_DEBUG(log, "Auto-increment is " << file_names_increment.value);
}
@ -463,6 +497,18 @@ void StorageDistributed::shutdown()
cluster_nodes_data.clear();
}
Strings StorageDistributed::getDataPaths() const
{
Strings paths;
if (relative_data_path.empty())
return paths;
for (const DiskPtr & disk : volume->disks)
paths.push_back(disk->getPath() + relative_data_path);
return paths;
}
void StorageDistributed::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
@ -508,33 +554,28 @@ bool StorageDistributed::hasColumn(const String & column_name) const
return virtual_columns.count(column_name) || getColumns().hasPhysical(column_name);
}
void StorageDistributed::createDirectoryMonitors()
void StorageDistributed::createDirectoryMonitors(const std::string & disk)
{
if (path.empty())
return;
const std::string path(disk + relative_data_path);
Poco::File{path}.createDirectories();
std::filesystem::directory_iterator begin(path);
std::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
if (std::filesystem::is_directory(*it))
requireDirectoryMonitor(it->path().filename().string());
requireDirectoryMonitor(disk, it->path().filename().string());
}
void StorageDistributed::requireDirectoryMonitor(const std::string & name)
void StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this, monitors_blocker);
}
const std::string path(disk + relative_data_path + name);
const std::string key(disk + name);
ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
{
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[name];
node_data.requireConnectionPool(name, *this);
return node_data.conneciton_pool;
auto & node_data = cluster_nodes_data[key];
node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(*this, path, node_data.conneciton_pool, monitors_blocker);
}
size_t StorageDistributed::getShardCount() const
@ -542,25 +583,16 @@ size_t StorageDistributed::getShardCount() const
return getCluster()->getShardCount();
}
std::pair<const std::string &, const std::string &> StorageDistributed::getPath()
{
return {volume->getNextDisk()->getPath(), relative_data_path};
}
ClusterPtr StorageDistributed::getCluster() const
{
return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
}
void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::string & name, const StorageDistributed & storage)
{
if (!conneciton_pool)
conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
}
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(
const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker)
{
requireConnectionPool(name, storage);
if (!directory_monitor)
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool, monitor_blocker);
}
void StorageDistributed::ClusterNodeData::flushAllData()
{
directory_monitor->flushAllData();
@ -635,16 +667,26 @@ void StorageDistributed::flushClusterNodesAllData()
void StorageDistributed::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name,
TableStructureWriteLockHolder &)
{
if (!path.empty())
if (!relative_data_path.empty())
renameOnDisk(new_path_to_table_data);
renameInMemory(new_database_name, new_table_name);
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
auto new_path = global_context.getPath() + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
path = new_path;
for (const DiskPtr & disk : volume->disks)
{
const String path(disk->getPath());
auto new_path = path + new_path_to_table_data;
Poco::File(path + relative_data_path).renameTo(new_path);
LOG_DEBUG(log, "Updating path to " << new_path);
std::lock_guard lock(cluster_nodes_mutex);
for (auto & node : cluster_nodes_data)
node.second.directory_monitor->updatePath();
node.second.directory_monitor->updatePath(new_path);
}
renameInMemory(new_database_name, new_table_name);
relative_data_path = new_path_to_table_data;
}
@ -656,6 +698,7 @@ void registerStorageDistributed(StorageFactory & factory)
* - name of cluster in configuration;
* - name of remote database;
* - name of remote table;
* - policy to store data in;
*
* Remote database may be specified in following form:
* - identifier;
@ -666,10 +709,15 @@ void registerStorageDistributed(StorageFactory & factory)
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 3 || engine_args.size() == 4))
throw Exception("Storage Distributed requires 3 or 4 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table,"
" sharding key expression (optional).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (engine_args.size() < 3 || engine_args.size() > 5)
throw Exception(
"Storage Distributed requires from 3 to 5 parameters - "
"name of configuration section with list of remote servers, "
"name of remote database, "
"name of remote table, "
"sharding key expression (optional), "
"policy to store data in (optional).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name = getClusterName(*engine_args[0]);
@ -679,7 +727,8 @@ void registerStorageDistributed(StorageFactory & factory)
String remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const auto & sharding_key = engine_args.size() == 4 ? engine_args[3] : nullptr;
const auto & sharding_key = engine_args.size() >= 4 ? engine_args[3] : nullptr;
const auto & storage_policy = engine_args.size() >= 5 ? engine_args[4]->as<ASTLiteral &>().value.safeGet<String>() : "";
/// Check that sharding_key exists in the table and has numeric type.
if (sharding_key)
@ -700,7 +749,10 @@ void registerStorageDistributed(StorageFactory & factory)
return StorageDistributed::create(
args.table_id, args.columns, args.constraints,
remote_database, remote_table, cluster_name,
args.context, sharding_key, args.relative_data_path,
args.context,
sharding_key,
storage_policy,
args.relative_data_path,
args.attach);
});
}

View File

@ -20,6 +20,9 @@ namespace DB
class Context;
class StorageDistributedDirectoryMonitor;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
/** A distributed table that resides on multiple servers.
* Uses data from the specified database and tables on each server.
@ -81,6 +84,7 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameOnDisk(const String & new_path_to_table_data);
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
@ -92,22 +96,20 @@ public:
void startup() override;
void shutdown() override;
Strings getDataPaths() const override { return {path}; }
Strings getDataPaths() const override;
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
size_t getShardCount() const;
const String & getPath() const { return path; }
std::pair<const std::string &, const std::string &> getPath();
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
std::string getClusterName() const { return cluster_name; } /// Returns empty string if tables is used by TableFunctionRemote
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors();
/// ensure directory monitor thread creation by subdirectory name
void requireDirectoryMonitor(const std::string & name);
/// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name);
void createDirectoryMonitors(const std::string & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
void requireDirectoryMonitor(const std::string & disk, const std::string & name);
void flushClusterNodesAllData();
@ -131,24 +133,6 @@ public:
bool has_sharding_key;
ExpressionActionsPtr sharding_key_expr;
String sharding_key_column_name;
String path; /// Can be empty if data_path_ is empty. In this case, a directory for the data to be sent is not created.
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr conneciton_pool;
/// Creates connection_pool if not exists.
void requireConnectionPool(const std::string & name, const StorageDistributed & storage);
/// Creates directory_monitor if not exists.
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker);
void flushAllData();
void shutdownAndDropAllData();
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
std::mutex cluster_nodes_mutex;
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
@ -165,6 +149,7 @@ protected:
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & relative_data_path_,
bool attach_);
@ -176,10 +161,30 @@ protected:
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & relative_data_path_,
bool attach);
ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info);
void createStorage();
String storage_policy;
String relative_data_path;
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
VolumePtr volume;
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr conneciton_pool;
void flushAllData();
void shutdownAndDropAllData();
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
std::mutex cluster_nodes_mutex;
};
}

View File

@ -0,0 +1,23 @@
<yandex>
<storage_configuration>
<disks>
<disk1>
<path>/disk1/</path>
</disk1>
<disk2>
<path>/disk2/</path>
</disk2>
</disks>
<policies>
<default>
<volumes>
<main>
<disk>disk1</disk>
<disk>disk2</disk>
</main>
</volumes>
</default>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,65 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node',
config_dir='configs',
tmpfs=['/disk1:size=100M', '/disk2:size=100M'])
@pytest.fixture(scope='module')
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def _files_in_dist_mon(node, root, table):
return int(node.exec_in_container([
'bash',
'-c',
# `-maxdepth 1` to avoid /tmp/ subdirectory
'find /{root}/data/default/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f | wc -l'.format(root=root, table=table)
]).split('\n')[0])
def test_different_versions(start_cluster):
node.query('CREATE TABLE foo (key Int) Engine=Memory()')
node.query("""
CREATE TABLE dist_foo (key Int)
Engine=Distributed(
test_cluster_two_shards,
currentDatabase(),
foo,
key%2,
'default'
)
""")
# manual only
node.query('SYSTEM STOP DISTRIBUTED SENDS dist_foo')
node.query('INSERT INTO dist_foo SELECT * FROM numbers(100)')
assert _files_in_dist_mon(node, 'disk1', 'dist_foo') == 1
assert _files_in_dist_mon(node, 'disk2', 'dist_foo') == 0
assert node.query('SELECT count() FROM dist_foo') == '100\n'
node.query('SYSTEM FLUSH DISTRIBUTED dist_foo')
assert node.query('SELECT count() FROM dist_foo') == '200\n'
#
# RENAME
#
node.query('RENAME TABLE dist_foo TO dist2_foo')
node.query('INSERT INTO dist2_foo SELECT * FROM numbers(100)')
assert _files_in_dist_mon(node, 'disk1', 'dist2_foo') == 0
assert _files_in_dist_mon(node, 'disk2', 'dist2_foo') == 1
assert node.query('SELECT count() FROM dist2_foo') == '300\n'
node.query('SYSTEM FLUSH DISTRIBUTED dist2_foo')
assert node.query('SELECT count() FROM dist2_foo') == '400\n'

View File

@ -3,11 +3,23 @@
**The Distributed engine does not store data itself**, but allows distributed query processing on multiple servers.
Reading is automatically parallelized. During a read, the table indexes on remote servers are used, if there are any.
The Distributed engine accepts parameters: the cluster name in the server's config file, the name of a remote database, the name of a remote table, and (optionally) a sharding key.
The Distributed engine accepts parameters:
- the cluster name in the server's config file
- the name of a remote database
- the name of a remote table
- (optionally) sharding key
- (optionally) policy name, it will be used to store temporary files for async send
See also:
- `insert_distributed_sync` setting
- [MergeTree](../mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
Example:
```sql
Distributed(logs, default, hits[, sharding_key])
Distributed(logs, default, hits[, sharding_key[, policy_name]])
```
Data will be read from all servers in the 'logs' cluster, from the default.hits table located on every server in the cluster.