Support single disk instead of storage policy

This commit is contained in:
kssenii 2022-09-27 22:34:31 +02:00
parent 036d1c8cbc
commit 01c4299481
11 changed files with 209 additions and 14 deletions

View File

@ -400,11 +400,16 @@ StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Uti
{
std::shared_ptr<StoragePolicySelector> result = std::make_shared<StoragePolicySelector>(config, config_prefix, disks);
std::lock_guard lock(mutex);
/// First pass, check.
for (const auto & [name, policy] : policies)
{
if (name.starts_with(TMP_STORAGE_POLICY_PREFIX))
continue;
if (!result->policies.contains(name))
throw Exception("Storage policy " + backQuote(name) + " is missing in new configuration", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage policy {} is missing in new configuration", backQuote(name));
policy->checkCompatibleWith(result->policies[name]);
}
@ -412,20 +417,43 @@ StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Uti
/// Second pass, load.
for (const auto & [name, policy] : policies)
{
/// Do not reload from config temporary storage policy, because it is not present in config.
if (name.starts_with(TMP_STORAGE_POLICY_PREFIX))
result->policies[name] = policy;
else
result->policies[name] = std::make_shared<StoragePolicy>(policy, config, config_prefix + "." + name, disks);
}
return result;
}
StoragePolicyPtr StoragePolicySelector::get(const String & name) const
StoragePolicyPtr StoragePolicySelector::tryGet(const String & name) const
{
std::lock_guard lock(mutex);
auto it = policies.find(name);
if (it == policies.end())
throw Exception("Unknown storage policy " + backQuote(name), ErrorCodes::UNKNOWN_POLICY);
return nullptr;
return it->second;
}
StoragePolicyPtr StoragePolicySelector::get(const String & name) const
{
auto policy = tryGet(name);
if (!policy)
throw Exception("Unknown storage policy " + backQuote(name), ErrorCodes::UNKNOWN_POLICY);
return policy;
}
void StoragePolicySelector::add(StoragePolicyPtr storage_policy)
{
std::lock_guard lock(mutex);
auto [_, inserted] = policies.emplace(storage_policy->getName(), storage_policy);
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "StoragePolicy is already present in StoragePolicySelector");
}
}

View File

@ -92,6 +92,7 @@ public:
bool hasAnyVolumeWithDisabledMerges() const override;
bool containsVolume(const String & volume_name) const override;
private:
Volumes volumes;
const String name;
@ -118,6 +119,8 @@ using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector
{
public:
static constexpr auto TMP_STORAGE_POLICY_PREFIX = "__";
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
StoragePolicySelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const;
@ -125,11 +128,20 @@ public:
/// Policy by name
StoragePolicyPtr get(const String & name) const;
StoragePolicyPtr tryGet(const String & name) const;
/// All policies
const StoragePoliciesMap & getPoliciesMap() const { return policies; }
/// Add storage policy to StoragePolicySelector.
/// Used when storage policy needs to be created on the fly, not being present in config file.
/// Done by getOrSetStoragePolicyForSingleDisk.
void add(StoragePolicyPtr storage_policy);
private:
StoragePoliciesMap policies;
mutable std::mutex mutex;
};
}

View File

@ -33,6 +33,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/StoragePolicy.h>
#include <IO/SynchronousReader.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
@ -2662,6 +2663,30 @@ StoragePolicyPtr Context::getStoragePolicy(const String & name) const
return policy_selector->get(name);
}
StoragePolicyPtr Context::getOrSetStoragePolicyForSingleDisk(const String & name) const
{
std::lock_guard lock(shared->storage_policies_mutex);
const std::string storage_policy_name = StoragePolicySelector::TMP_STORAGE_POLICY_PREFIX + name;
auto storage_policy_selector = getStoragePolicySelector(lock);
StoragePolicyPtr storage_policy = storage_policy_selector->tryGet(storage_policy_name);
if (!storage_policy)
{
auto disk_selector = getDiskSelector(lock);
auto disk = disk_selector->get(name);
auto volume = std::make_shared<SingleDiskVolume>("_volume_" + name, disk);
static const auto move_factor_for_single_disk_volume = 0.0;
storage_policy = std::make_shared<StoragePolicy>(storage_policy_name, Volumes{volume}, move_factor_for_single_disk_volume);
const_cast<StoragePolicySelector *>(storage_policy_selector.get())->add(storage_policy);
}
/// Note: it is important to put storage policy into disk selector (and not recreate it on each call)
/// because in some places there are checks that storage policy pointers are the same from different tables.
/// (We can assume that tables with the same `disk` setting are on the same storage policy).
return storage_policy;
}
DisksMap Context::getDisksMap() const
{

View File

@ -918,6 +918,8 @@ public:
/// Provides storage politics schemes
StoragePolicyPtr getStoragePolicy(const String & name) const;
StoragePolicyPtr getOrSetStoragePolicyForSingleDisk(const String & name) const;
/// Get the server uptime in seconds.
double getUptimeSeconds() const;

View File

@ -234,6 +234,10 @@ MergeTreeData::MergeTreeData(
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();
const auto settings = getSettings();
if (settings->disk.changed && settings->storage_policy.changed)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "MergeTree settings `storage_policy` and `disk` cannot be specified at the same time");
allow_nullable_key = attach || settings->allow_nullable_key;
if (relative_data_path.empty())
@ -365,7 +369,15 @@ MergeTreeData::MergeTreeData(
StoragePolicyPtr MergeTreeData::getStoragePolicy() const
{
return getContext()->getStoragePolicy(getSettings()->storage_policy);
const auto & settings = getSettings();
StoragePolicyPtr storage_policy;
if (settings->disk.changed)
storage_policy = getContext()->getOrSetStoragePolicyForSingleDisk(settings->disk);
else
storage_policy = getContext()->getStoragePolicy(settings->storage_policy);
return storage_policy;
}
bool MergeTreeData::supportsFinal() const

View File

@ -128,6 +128,7 @@ struct Settings;
M(MaxThreads, max_part_removal_threads, 0, "The number of threads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).", 0) \
M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(String, storage_policy, "default", "Name of storage disk policy", 0) \
M(String, disk, "", "Name of storage disk. Can be specified instead of storage policy.", 0) \
M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm.", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \

View File

@ -683,8 +683,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
{
auto storage_policy = args.getContext()->getStoragePolicy(storage_settings->storage_policy);
return std::make_shared<StorageReplicatedMergeTree>(
zookeeper_path,
replica_name,

View File

@ -1632,13 +1632,18 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
throw Exception("Table " + getStorageID().getNameForLogs() + " supports movePartitionToTable only for MergeTree family of table engines."
" Got " + dest_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Table {} supports movePartitionToTable only for MergeTree family of table engines. Got {}",
getStorageID().getNameForLogs(), dest_table->getName());
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
throw Exception("Destination table " + dest_table_storage->getStorageID().getNameForLogs() +
" should have the same storage policy of source table " + getStorageID().getNameForLogs() + ". " +
getStorageID().getNameForLogs() + ": " + this->getStoragePolicy()->getName() + ", " +
dest_table_storage->getStorageID().getNameForLogs() + ": " + dest_table_storage->getStoragePolicy()->getName(), ErrorCodes::UNKNOWN_POLICY);
throw Exception(
ErrorCodes::UNKNOWN_POLICY,
"Destination table {} should have the same storage policy of source table {}. {} : {}, {} : {}",
dest_table_storage->getStorageID().getNameForLogs(), getStorageID().getNameForLogs(),
getStorageID().getNameForLogs(), this->getStoragePolicy()->getName(),
dest_table_storage->getStorageID().getNameForLogs(), dest_table_storage->getStoragePolicy()->getName());
auto dest_metadata_snapshot = dest_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -0,0 +1,25 @@
<clickhouse>
<storage_configuration>
<disks>
<disk_local>
<type>local</type>
<path>/disk_local/</path>
</disk_local>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<local>
<volumes>
<main>
<disk>disk_local</disk>
</main>
</volumes>
</local>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,87 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
TABLE_NAME = "test"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/storage_configuration.xml",
],
with_zookeeper=True,
stay_alive=True,
with_minio=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_merge_tree_disk_setting(start_cluster):
node1 = cluster.instances["node1"]
assert (
"MergeTree settings `storage_policy` and `disk` cannot be specified at the same time"
in node1.query_and_get_error(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 'disk_local', storage_policy = 's3';
"""
)
)
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 's3';
"""
)
minio = cluster.minio_client
count = len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
node1.query(f"INSERT INTO {TABLE_NAME} SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
assert len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))) > count
node1.query(
f"""
DROP TABLE IF EXISTS {TABLE_NAME}_2;
CREATE TABLE {TABLE_NAME}_2 (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 's3';
"""
)
node1.query(f"INSERT INTO {TABLE_NAME}_2 SELECT number FROM numbers(100)")
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}_2")) == 100
assert "__s3" in node1.query(f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'").strip()
assert "__s3" in node1.query(f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}_2'").strip()
node1.query("SYSTEM RELOAD CONFIG")
assert not node1.contains_in_log("An error has occurred while reloading storage policies, storage policies were not applied")
assert "['s3']" in node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'").strip()
node1.restart_clickhouse()
assert "_s3" in node1.query(f"SELECT storage_policy FROM system.tables WHERE name = '{TABLE_NAME}'").strip()
assert "['s3']" in node1.query("SELECT disks FROM system.storage_policies WHERE policy_name = '__s3'").strip()
assert int(node1.query(f"SELECT count() FROM {TABLE_NAME}")) == 100