Make absolute path on disks start from /clickhouse/data, fix freeze and rename, add tests

This commit is contained in:
alesapin 2019-09-11 13:57:32 +03:00
parent 34a454aa62
commit 0b8aec469f
5 changed files with 89 additions and 18 deletions

View File

@ -126,9 +126,12 @@ public:
/// Disk name from configuration; /// Disk name from configuration;
const String & getName() const override { return name; } const String & getName() const override { return name; }
/// Path on fs /// Path on fs to disk
const String & getPath() const { return path; } const String & getPath() const { return path; }
/// Path to clickhouse data folder on this disk
String getClickHouseDataPath() const { return path + "clickhouse/data/"; }
/// Amount of bytes which should be kept free on this disk /// Amount of bytes which should be kept free on this disk
UInt64 getKeepingFreeSpace() const { return keep_free_space_bytes; } UInt64 getKeepingFreeSpace() const { return keep_free_space_bytes; }
@ -144,6 +147,7 @@ public:
/// Currently available (prev method) minus already reserved space /// Currently available (prev method) minus already reserved space
UInt64 getUnreservedSpace() const; UInt64 getUnreservedSpace() const;
private: private:
const String name; const String name;
const String path; const String path;

View File

@ -1187,7 +1187,7 @@ void MergeTreeData::rename(
for (const auto & disk : disks) for (const auto & disk : disks)
{ {
auto new_full_path = disk->getPath() + new_file_db_name + '/' + new_file_table_name + '/'; auto new_full_path = disk->getClickHouseDataPath() + new_file_db_name + '/' + new_file_table_name + '/';
if (Poco::File{new_full_path}.exists()) if (Poco::File{new_full_path}.exists())
throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS}; throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
@ -1195,8 +1195,8 @@ void MergeTreeData::rename(
for (const auto & disk : disks) for (const auto & disk : disks)
{ {
auto full_path = disk->getPath() + old_file_db_name + '/' + old_file_table_name + '/'; auto full_path = disk->getClickHouseDataPath() + old_file_db_name + '/' + old_file_table_name + '/';
auto new_db_path = disk->getPath() + new_file_db_name + '/'; auto new_db_path = disk->getClickHouseDataPath() + new_file_db_name + '/';
Poco::File db_file{new_db_path}; Poco::File db_file{new_db_path};
if (!db_file.exists()) if (!db_file.exists())
@ -3235,8 +3235,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString()); LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path); localBackup(src_part_absolute_path, dst_part_absolute_path);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(
reservation->getDisk(), dst_part_name, dst_part_info); *this, reservation->getDisk(), dst_part_name, dst_part_info);
dst_data_part->relative_path = tmp_dst_part_name; dst_data_part->relative_path = tmp_dst_part_name;
dst_data_part->is_temp = true; dst_data_part->is_temp = true;
@ -3247,7 +3248,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
String MergeTreeData::getFullPathOnDisk(const DiskSpace::DiskPtr & disk) const String MergeTreeData::getFullPathOnDisk(const DiskSpace::DiskPtr & disk) const
{ {
return disk->getPath() /*+ "/clickhouse/" */ + escapeForFileName(database_name) + '/' + escapeForFileName(table_name) + '/'; return disk->getClickHouseDataPath() + escapeForFileName(database_name) + '/' + escapeForFileName(table_name) + '/';
} }
@ -3285,9 +3286,9 @@ Strings MergeTreeData::getDataPaths() const
void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context) void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context)
{ {
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString(); String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
String global_shadow_path = clickhouse_path + "shadow/"; String default_shadow_path = clickhouse_path + "shadow/";
Poco::File(global_shadow_path).createDirectories(); Poco::File(default_shadow_path).createDirectories();
auto increment = Increment(global_shadow_path + "increment.txt").get(true); auto increment = Increment(default_shadow_path + "increment.txt").get(true);
/// Acquire a snapshot of active data parts to prevent removing while doing backup. /// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getDataParts(); const auto data_parts = getDataParts();
@ -3298,10 +3299,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
if (!matcher(part)) if (!matcher(part))
continue; continue;
String shadow_path = global_shadow_path; String shadow_path = part->disk->getPath() + "clickhouse/shadow/";
/// place new folder into data directory
if (part->disk->getName() != "default")
shadow_path = part->disk->getPath() + "shadow/";
Poco::File(shadow_path).createDirectories(); Poco::File(shadow_path).createDirectories();
String backup_path = shadow_path String backup_path = shadow_path
@ -3313,8 +3311,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path); LOG_DEBUG(log, "Freezing part " << part->name << " snapshot will be placed at " + backup_path);
String part_absolute_path = Poco::Path(part->getFullPath()).absolute().toString(); String part_absolute_path = Poco::Path(part->getFullPath()).absolute().toString();
String backup_part_absolute_path = part_absolute_path; String backup_part_absolute_path = backup_path + "data/" + getDatabaseName() + "/" + getTableName() + "/" + part->relative_path;
backup_part_absolute_path.replace(0, clickhouse_path.size(), backup_path);
localBackup(part_absolute_path, backup_part_absolute_path); localBackup(part_absolute_path, backup_part_absolute_path);
part->is_frozen.store(true, std::memory_order_relaxed); part->is_frozen.store(true, std::memory_order_relaxed);
++parts_processed; ++parts_processed;

View File

@ -88,7 +88,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingMaxThreads, max_part_loading_threads, 0, "The number of theads to load data parts at startup.") \ M(SettingMaxThreads, max_part_loading_threads, 0, "The number of theads to load data parts at startup.") \
M(SettingMaxThreads, max_part_removal_threads, 0, "The number of theads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).") \ M(SettingMaxThreads, max_part_removal_threads, 0, "The number of theads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).") \
M(SettingUInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.") \ M(SettingUInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.") \
M(SettingString, storage_policy_name, "default", "Name of storage policy") M(SettingString, storage_policy_name, "default", "Name of storage disk policy")
DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS) DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS)

View File

@ -19,7 +19,6 @@
</disks> </disks>
<policies> <policies>
<!-- store on JBOD by default (round-robin), store big parts on external -->
<small_jbod_with_external> <small_jbod_with_external>
<volumes> <volumes>
<main> <main>

View File

@ -819,3 +819,74 @@ def test_download_appropriate_disk(start_cluster):
finally: finally:
for node in [node1, node2]: for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS replicated_table_for_download") node.query("DROP TABLE IF EXISTS replicated_table_for_download")
def test_rename(start_cluster):
try:
node1.query("""
CREATE TABLE default.renaming_table (
s String
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy_name='small_jbod_with_external'
""")
for _ in range(5):
data = []
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB value
node1.query("INSERT INTO renaming_table VALUES {}".format(','.join(["('" + x + "')" for x in data])))
disks = get_used_disks_for_table(node1, "renaming_table")
assert len(disks) > 1
assert node1.query("SELECT COUNT() FROM default.renaming_table") == "50\n"
node1.query("RENAME TABLE default.renaming_table TO default.renaming_table1")
assert node1.query("SELECT COUNT() FROM default.renaming_table1") == "50\n"
with pytest.raises(QueryRuntimeException):
node1.query("SELECT COUNT() FROM default.renaming_table")
node1.query("CREATE DATABASE IF NOT EXISTS test")
node1.query("RENAME TABLE default.renaming_table1 TO test.renaming_table2")
assert node1.query("SELECT COUNT() FROM test.renaming_table2") == "50\n"
with pytest.raises(QueryRuntimeException):
node1.query("SELECT COUNT() FROM default.renaming_table1")
finally:
node1.query("DROP TABLE IF EXISTS default.renaming_table")
node1.query("DROP TABLE IF EXISTS default.renaming_table1")
node1.query("DROP TABLE IF EXISTS test.renaming_table2")
def test_freeze(start_cluster):
try:
node1.query("""
CREATE TABLE default.freezing_table (
d Date,
s String
) ENGINE = MergeTree
ORDER BY tuple()
PARTITION BY toYYYYMM(d)
SETTINGS storage_policy_name='small_jbod_with_external'
""")
for _ in range(5):
data = []
dates = []
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB value
dates.append("toDate('2019-03-05')")
node1.query("INSERT INTO freezing_table VALUES {}".format(','.join(["(" + d + ", '" + s + "')" for d, s in zip(dates, data)])))
disks = get_used_disks_for_table(node1, "freezing_table")
assert len(disks) > 1
assert node1.query("SELECT COUNT() FROM default.freezing_table") == "50\n"
node1.query("ALTER TABLE freezing_table FREEZE PARTITION 201903")
# check shadow files (backups) exists
node1.exec_in_container(["bash", "-c", "find /jbod1/clickhouse/shadow -name '*.mrk2' | grep '.*'"])
node1.exec_in_container(["bash", "-c", "find /external/clickhouse/shadow -name '*.mrk2' | grep '.*'"])
finally:
node1.query("DROP TABLE IF EXISTS default.freezing_table")