mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
Support specifying settings for Distributed engine in config (like for MergeTree)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
parent
1404b73b50
commit
4248ae509a
@ -1401,6 +1401,13 @@
|
||||
</replicated_merge_tree>
|
||||
-->
|
||||
|
||||
<!-- Settings to fine tune Distributed tables. See documentation in source code, in DistributedSettings.h -->
|
||||
<!--
|
||||
<distributed>
|
||||
<flush_on_detach>false</flush_on_detach>
|
||||
</distributed>
|
||||
-->
|
||||
|
||||
<!-- Protection from accidental DROP.
|
||||
If size of a MergeTree table is greater than max_table_size_to_drop (in bytes) than table could not be dropped with any DROP query.
|
||||
If you want do delete one table and don't want to change clickhouse-server config, you could create special file <clickhouse-path>/flags/force_drop_table and make DROP once.
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include <Storages/MergeTree/ReplicatedFetchList.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/Distributed/DistributedSettings.h>
|
||||
#include <Storages/CompressionCodecSelector.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Disks/DiskLocal.h>
|
||||
@ -111,6 +112,7 @@
|
||||
#include <Parsers/FunctionParameterValuesVisitor.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
@ -352,6 +354,7 @@ struct ContextSharedPart : boost::noncopyable
|
||||
|
||||
std::optional<MergeTreeSettings> merge_tree_settings TSA_GUARDED_BY(mutex); /// Settings of MergeTree* engines.
|
||||
std::optional<MergeTreeSettings> replicated_merge_tree_settings TSA_GUARDED_BY(mutex); /// Settings of ReplicatedMergeTree* engines.
|
||||
std::optional<DistributedSettings> distributed_settings TSA_GUARDED_BY(mutex);
|
||||
std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
|
||||
std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
|
||||
/// No lock required for format_schema_path modified only during initialization
|
||||
@ -4092,6 +4095,21 @@ const MergeTreeSettings & Context::getReplicatedMergeTreeSettings() const
|
||||
return *shared->replicated_merge_tree_settings;
|
||||
}
|
||||
|
||||
const DistributedSettings & Context::getDistributedSettings() const
|
||||
{
|
||||
std::lock_guard lock(shared->mutex);
|
||||
|
||||
if (!shared->distributed_settings)
|
||||
{
|
||||
const auto & config = shared->getConfigRefWithLock(lock);
|
||||
DistributedSettings distributed_settings;
|
||||
distributed_settings.loadFromConfig("distributed", config);
|
||||
shared->distributed_settings.emplace(distributed_settings);
|
||||
}
|
||||
|
||||
return *shared->distributed_settings;
|
||||
}
|
||||
|
||||
const StorageS3Settings & Context::getStorageS3Settings() const
|
||||
{
|
||||
std::lock_guard lock(shared->mutex);
|
||||
|
@ -113,6 +113,7 @@ class BlobStorageLog;
|
||||
class IAsynchronousReader;
|
||||
class IOUringReader;
|
||||
struct MergeTreeSettings;
|
||||
struct DistributedSettings;
|
||||
struct InitialAllRangesAnnouncement;
|
||||
struct ParallelReadRequest;
|
||||
struct ParallelReadResponse;
|
||||
@ -1073,6 +1074,7 @@ public:
|
||||
|
||||
const MergeTreeSettings & getMergeTreeSettings() const;
|
||||
const MergeTreeSettings & getReplicatedMergeTreeSettings() const;
|
||||
const DistributedSettings & getDistributedSettings() const;
|
||||
const StorageS3Settings & getStorageS3Settings() const;
|
||||
|
||||
/// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check)
|
||||
|
@ -15,6 +15,27 @@ namespace ErrorCodes
|
||||
|
||||
IMPLEMENT_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
|
||||
|
||||
void DistributedSettings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
if (!config.has(config_elem))
|
||||
return;
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||
config.keys(config_elem, config_keys);
|
||||
|
||||
try
|
||||
{
|
||||
for (const String & key : config_keys)
|
||||
set(key, config.getString(config_elem + "." + key));
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
|
||||
e.addMessage("in Distributed config");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void DistributedSettings::loadFromQuery(ASTStorage & storage_def)
|
||||
{
|
||||
if (storage_def.settings)
|
||||
|
@ -37,6 +37,7 @@ DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
|
||||
*/
|
||||
struct DistributedSettings : public BaseSettings<DistributedSettingsTraits>
|
||||
{
|
||||
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
|
||||
void loadFromQuery(ASTStorage & storage_def);
|
||||
};
|
||||
|
||||
|
@ -1922,7 +1922,7 @@ void registerStorageDistributed(StorageFactory & factory)
|
||||
}
|
||||
|
||||
/// TODO: move some arguments from the arguments to the SETTINGS.
|
||||
DistributedSettings distributed_settings;
|
||||
DistributedSettings distributed_settings = context->getDistributedSettings();
|
||||
if (args.storage_def->settings)
|
||||
{
|
||||
distributed_settings.loadFromQuery(*args.storage_def);
|
||||
|
@ -0,0 +1,5 @@
|
||||
<clickhouse>
|
||||
<distributed>
|
||||
<flush_on_detach>0</flush_on_detach>
|
||||
</distributed>
|
||||
</clickhouse>
|
49
tests/integration/test_distributed_config/test.py
Normal file
49
tests/integration/test_distributed_config/test.py
Normal file
@ -0,0 +1,49 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
import logging
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance("node", main_configs=["configs/overrides.xml"])
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_distibuted_settings(start_cluster):
|
||||
node.query("")
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE data_1 (key Int) ENGINE Memory();
|
||||
CREATE TABLE dist_1 as data_1 ENGINE Distributed(default, default, data_1) SETTINGS flush_on_detach = true;
|
||||
SYSTEM STOP DISTRIBUTED SENDS dist_1;
|
||||
INSERT INTO dist_1 SETTINGS prefer_localhost_replica=0 VALUES (1);
|
||||
DETACH TABLE dist_1;
|
||||
"""
|
||||
)
|
||||
assert "flush_on_detach = 1" in node.query("SHOW CREATE dist_1")
|
||||
# flush_on_detach=true, so data_1 should have 1 row
|
||||
assert int(node.query("SELECT count() FROM data_1")) == 1
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE data_2 (key Int) ENGINE Memory();
|
||||
CREATE TABLE dist_2 as data_2 ENGINE Distributed(default, default, data_2);
|
||||
SYSTEM STOP DISTRIBUTED SENDS dist_2;
|
||||
INSERT INTO dist_2 SETTINGS prefer_localhost_replica=0 VALUES (2);
|
||||
DETACH TABLE dist_2;
|
||||
"""
|
||||
)
|
||||
## Settings are not added to CREATE (only specific one, like index_granularity for MergeTree)
|
||||
# assert "flush_on_detach = 0" in node.query("SHOW CREATE dist_2")
|
||||
|
||||
# But settins are applied (flush_on_detach=false in config, so data_2 should not have any rows)
|
||||
assert int(node.query("SELECT count() FROM data_2")) == 0
|
Loading…
Reference in New Issue
Block a user