Merge pull request #61769 from kirillgarbar/modify-engine

Search for convert_to_replicated flag at the correct path
This commit is contained in:
alesapin 2024-04-24 18:17:29 +00:00 committed by GitHub
commit 1b562ce569
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 171 additions and 11 deletions

View File

@ -304,10 +304,10 @@ We use the term `MergeTree` to refer to all table engines in the `MergeTree fami
If you had a `MergeTree` table that was manually replicated, you can convert it to a replicated table. You might need to do this if you have already collected a large amount of data in a `MergeTree` table and now you want to enable replication.
`MergeTree` table can be automatically converted on server restart if `convert_to_replicated` flag is set at the table's data directory (`/var/lib/clickhouse/store/xxx/xxxyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy/` for `Atomic` database).
`MergeTree` table can be automatically converted on server restart if `convert_to_replicated` flag is set at the table's data directory (`/store/xxx/xxxyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy/` for `Atomic` database).
Create empty `convert_to_replicated` file and the table will be loaded as replicated on next server restart.
This query can be used to get the table's data path.
This query can be used to get the table's data path. If table has many data paths, you have to use the first one.
```sql
SELECT data_paths FROM system.tables WHERE table = 'table_name' AND database = 'database_name';

View File

@ -95,16 +95,21 @@ static void setReplicatedEngine(ASTCreateQuery * create_query, ContextPtr contex
create_query->storage->set(create_query->storage->engine, engine->clone());
}
String DatabaseOrdinary::getConvertToReplicatedFlagPath(const String & name, bool tableStarted)
String DatabaseOrdinary::getConvertToReplicatedFlagPath(const String & name, const StoragePolicyPtr storage_policy, bool tableStarted)
{
fs::path data_path;
if (storage_policy->getDisks().empty())
data_path = getContext()->getPath();
else
data_path = storage_policy->getDisks()[0]->getPath();
if (!tableStarted)
{
auto create_query = tryGetCreateTableQuery(name, getContext());
data_path = fs::path(getContext()->getPath()) / getTableDataPath(create_query->as<ASTCreateQuery &>());
data_path = data_path / getTableDataPath(create_query->as<ASTCreateQuery &>());
}
else
data_path = fs::path(getContext()->getPath()) / getTableDataPath(name);
data_path = data_path / getTableDataPath(name);
return (data_path / CONVERT_TO_REPLICATED_FLAG_NAME).string();
}
@ -120,7 +125,14 @@ void DatabaseOrdinary::convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const Qu
if (!create_query->storage || !create_query->storage->engine->name.ends_with("MergeTree") || create_query->storage->engine->name.starts_with("Replicated") || create_query->storage->engine->name.starts_with("Shared"))
return;
auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(qualified_name.table, false);
/// Get table's storage policy
MergeTreeSettings default_settings = getContext()->getMergeTreeSettings();
auto policy = getContext()->getStoragePolicy(default_settings.storage_policy);
if (auto * query_settings = create_query->storage->settings)
if (Field * policy_setting = query_settings->changes.tryGet("storage_policy"))
policy = getContext()->getStoragePolicy(policy_setting->safeGet<String>());
auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(qualified_name.table, policy, false);
if (!fs::exists(convert_to_replicated_flag_path))
return;
@ -288,7 +300,7 @@ void DatabaseOrdinary::restoreMetadataAfterConvertingToReplicated(StoragePtr tab
if (!rmt)
return;
auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(name.table, true);
auto convert_to_replicated_flag_path = getConvertToReplicatedFlagPath(name.table, table->getStoragePolicy(), true);
if (!fs::exists(convert_to_replicated_flag_path))
return;

View File

@ -86,7 +86,7 @@ protected:
private:
void convertMergeTreeToReplicatedIfNeeded(ASTPtr ast, const QualifiedTableName & qualified_name, const String & file_name);
void restoreMetadataAfterConvertingToReplicated(StoragePtr table, const QualifiedTableName & name);
String getConvertToReplicatedFlagPath(const String & name, bool tableStarted);
String getConvertToReplicatedFlagPath(const String & name, StoragePolicyPtr storage_policy, bool tableStarted);
};
}

View File

@ -2,9 +2,13 @@ from helpers.cluster import ClickHouseCluster
def get_table_path(node, table, database):
return node.query(
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{database}'"
).strip("'[]\n")
return (
node.query(
sql=f"SELECT data_paths FROM system.tables WHERE table = '{table}' and database = '{database}' LIMIT 1"
)
.split(",")[0]
.strip("'[]\n")
)
def check_flags_deleted(node, database_name, tables):

View File

@ -0,0 +1,42 @@
<clickhouse>
<storage_configuration>
<disks>
<default>
<keep_free_space_bytes>1024</keep_free_space_bytes>
</default>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
</jbod2>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<jbod>
<volumes>
<jbod_volume>
<disk>jbod1</disk>
<disk>jbod2</disk>
</jbod_volume>
</volumes>
</jbod>
<s3>
<volumes>
<s3_volume>
<disk>s3</disk>
</s3_volume>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>jbod</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,102 @@
import pytest
from test_modify_engine_on_restart.common import check_flags_deleted, set_convert_flags
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters.xml",
"configs/config.d/distributed_ddl.xml",
"configs/config.d/storage_policies.xml",
],
with_zookeeper=True,
with_minio=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "modify_engine_storage_policies"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=database_name, sql=query)
def create_tables():
# Implicit jbod (set default in config)
q(
ch1,
"CREATE TABLE jbod_imp ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A;",
)
# Explicit jbod
q(
ch1,
"""
CREATE TABLE jbod_exp ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A
SETTINGS storage_policy='jbod';
""",
)
# s3
q(
ch1,
"""
CREATE TABLE s3 ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A
SETTINGS storage_policy='s3';
""",
)
# Default
q(
ch1,
"""
CREATE TABLE default ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A
SETTINGS storage_policy='default';
""",
)
def check_tables(converted):
engine_prefix = ""
if converted:
engine_prefix = "Replicated"
assert (
q(
ch1,
f"SELECT name, engine FROM system.tables WHERE database = '{database_name}'",
).strip()
== f"default\t{engine_prefix}MergeTree\njbod_exp\t{engine_prefix}MergeTree\njbod_imp\t{engine_prefix}MergeTree\ns3\t{engine_prefix}MergeTree"
)
def test_modify_engine_on_restart(started_cluster):
ch1.query("CREATE DATABASE " + database_name)
create_tables()
check_tables(False)
ch1.restart_clickhouse()
check_tables(False)
set_convert_flags(ch1, database_name, ["default", "jbod_exp", "jbod_imp", "s3"])
ch1.restart_clickhouse()
check_flags_deleted(ch1, database_name, ["default", "jbod_exp", "jbod_imp", "s3"])
check_tables(True)