diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp index 3e4537ad45c..77e6ea32da2 100644 --- a/dbms/src/Storages/MergeTree/MergeList.cpp +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -17,6 +17,7 @@ namespace DB MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part) : database{database_}, table{table_}, partition_id{future_part.part_info.partition_id} , result_part_name{future_part.name} + , result_part_path{future_part.path} , result_data_version{future_part.part_info.getDataVersion()} , num_parts{future_part.parts.size()} , thread_number{getThreadNumber()} @@ -24,6 +25,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str for (const auto & source_part : future_part.parts) { source_part_names.emplace_back(source_part->name); + source_part_paths.emplace_back(source_part->getFullPath()); std::shared_lock part_lock(source_part->columns_lock); @@ -54,6 +56,7 @@ MergeInfo MergeListElement::getInfo() const res.database = database; res.table = table; res.result_part_name = result_part_name; + res.result_part_path = result_part_path; res.partition_id = partition_id; res.is_mutation = is_mutation; res.elapsed = watch.elapsedSeconds(); @@ -73,6 +76,9 @@ MergeInfo MergeListElement::getInfo() const for (const auto & source_part_name : source_part_names) res.source_part_names.emplace_back(source_part_name); + for (const auto & source_part_path : source_part_paths) + res.source_part_paths.emplace_back(source_part_path); + return res; } diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index 0a25277a6ed..98c627db24c 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -28,7 +28,9 @@ struct MergeInfo std::string database; std::string table; std::string result_part_name; + std::string result_part_path; Array source_part_names; + Array source_part_paths; std::string partition_id; bool is_mutation; Float64 elapsed; @@ -55,11 +57,13 @@ struct MergeListElement : boost::noncopyable std::string partition_id; const std::string result_part_name; + const std::string result_part_path; Int64 result_data_version{}; bool is_mutation{}; UInt64 num_parts{}; Names source_part_names; + Names source_part_paths; Int64 source_data_version{}; Stopwatch watch; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index e1b9ee656a3..0034c841585 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -120,6 +120,11 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_) name = part_info.getPartName(); } +void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const DiskSpace::ReservationPtr & reservation) +{ + path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/"; +} + MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size_) : data(data_), background_pool_size(background_pool_size_), log(&Logger::get(data.getLogName() + " (MergerMutator)")) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 4d87cfe35ff..8a1ceb40fea 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -17,6 +17,7 @@ class MergeProgressCallback; struct FutureMergedMutatedPart { String name; + String path; MergeTreePartInfo part_info; MergeTreeData::DataPartsVector parts; @@ -29,6 +30,7 @@ struct FutureMergedMutatedPart } void assign(MergeTreeData::DataPartsVector parts_); + void updatePath(const MergeTreeData & storage, const DiskSpace::ReservationPtr & reservation); }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 14e0191d6d4..132aebb70db 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -581,7 +581,6 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu void MergeTreeDataPart::loadIndexGranularity() { - String full_path = getFullPath(); index_granularity_info.changeGranularityIfRequired(full_path); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index d684b2de276..22a2d81a05f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -23,6 +23,7 @@ namespace DB struct ColumnSize; class MergeTreeData; +struct FutureMergedMutatedPart; /// Description of the data part. diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d3ff30d2d95..ee40e254f4d 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -343,7 +343,7 @@ struct CurrentlyMergingPartsTagger StorageMergeTree & storage; public: - CurrentlyMergingPartsTagger(const FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) + CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation) : future_part(future_part_), storage(storage_) { /// Assume mutex is already locked, because this method is called from mergeTask. @@ -361,6 +361,8 @@ public: throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE); } + future_part_.updatePath(storage, reserved_space); + for (const auto & part : future_part.parts) { if (storage.currently_merging_mutating_parts.count(part)) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 76b6c40126a..61f44628a3b 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1016,6 +1016,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: " + backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME); } + future_merged_part.updatePath(*this, reserved_space); MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_merged_part); @@ -1156,6 +1157,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM future_mutated_part.parts.push_back(source_part); future_mutated_part.part_info = new_part_info; future_mutated_part.name = entry.new_part_name; + future_mutated_part.updatePath(*this, reserved_space); MergeList::EntryPtr merge_entry = global_context.getMergeList().insert( database_name, table_name, future_mutated_part); diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index 0f3b06a27de..1ff717ee9b9 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -16,6 +16,8 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes() {"num_parts", std::make_shared()}, {"source_part_names", std::make_shared(std::make_shared())}, {"result_part_name", std::make_shared()}, + {"source_part_paths", std::make_shared(std::make_shared())}, + {"result_part_path", std::make_shared()}, {"partition_id", std::make_shared()}, {"is_mutation", std::make_shared()}, {"total_size_bytes_compressed", std::make_shared()}, @@ -45,6 +47,8 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context & res_columns[i++]->insert(merge.num_parts); res_columns[i++]->insert(merge.source_part_names); res_columns[i++]->insert(merge.result_part_name); + res_columns[i++]->insert(merge.source_part_paths); + res_columns[i++]->insert(merge.result_part_path); res_columns[i++]->insert(merge.partition_id); res_columns[i++]->insert(merge.is_mutation); res_columns[i++]->insert(merge.total_size_bytes_compressed); diff --git a/dbms/tests/integration/test_system_merges/__init__.py b/dbms/tests/integration/test_system_merges/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_system_merges/configs/config.d/cluster.xml b/dbms/tests/integration/test_system_merges/configs/config.d/cluster.xml new file mode 100644 index 00000000000..ec7c9b8e4f8 --- /dev/null +++ b/dbms/tests/integration/test_system_merges/configs/config.d/cluster.xml @@ -0,0 +1,16 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + \ No newline at end of file diff --git a/dbms/tests/integration/test_system_merges/configs/logs_config.xml b/dbms/tests/integration/test_system_merges/configs/logs_config.xml new file mode 100644 index 00000000000..bdf1bbc11c1 --- /dev/null +++ b/dbms/tests/integration/test_system_merges/configs/logs_config.xml @@ -0,0 +1,17 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + + system + part_log
+ 500 +
+
diff --git a/dbms/tests/integration/test_system_merges/test.py b/dbms/tests/integration/test_system_merges/test.py new file mode 100644 index 00000000000..7b638ce05c7 --- /dev/null +++ b/dbms/tests/integration/test_system_merges/test.py @@ -0,0 +1,160 @@ +import pytest +import threading +import time +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', + config_dir='configs', + main_configs=['configs/logs_config.xml'], + with_zookeeper=True, + macros={"shard": 0, "replica": 1} ) + +node2 = cluster.add_instance('node2', + config_dir='configs', + main_configs=['configs/logs_config.xml'], + with_zookeeper=True, + macros={"shard": 0, "replica": 2} ) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def split_tsv(data): + return [ x.split("\t") for x in data.splitlines() ] + + +@pytest.mark.parametrize("replicated", [ + "", + "replicated" +]) +def test_merge_simple(started_cluster, replicated): + try: + clickhouse_path = "/var/lib/clickhouse" + name = "test_merge_simple" + nodes = [node1, node2] if replicated else [node1] + engine = "ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')" if replicated else "MergeTree()" + node_check = nodes[-1] + starting_block = 0 if replicated else 1 + + for node in nodes: + node.query(""" + CREATE TABLE {name} + ( + `a` Int64 + ) + ENGINE = {engine} + ORDER BY sleep(2) + """.format(engine=engine, name=name)) + + node1.query("INSERT INTO {name} VALUES (1)".format(name=name)) + node1.query("INSERT INTO {name} VALUES (2)".format(name=name)) + node1.query("INSERT INTO {name} VALUES (3)".format(name=name)) + + parts = ["all_{}_{}_0".format(x, x) for x in range(starting_block, starting_block+3)] + result_part = "all_{}_{}_1".format(starting_block, starting_block+2) + + def optimize(): + node1.query("OPTIMIZE TABLE {name}".format(name=name)) + + wait = threading.Thread(target=time.sleep, args=(5,)) + wait.start() + t = threading.Thread(target=optimize) + t.start() + + time.sleep(1) + assert split_tsv(node_check.query(""" + SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation + FROM system.merges + WHERE table = '{name}' + """.format(name=name))) == [ + [ + "default", + name, + "3", + "['{}','{}','{}']".format(*parts), + "['{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/']".format(*parts, clickhouse=clickhouse_path, name=name), + result_part, + "{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name), + "all", + "0" + ] + ] + t.join() + wait.join() + + assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == "" + + finally: + for node in nodes: + node.query("DROP TABLE {name}".format(name=name)) + + +@pytest.mark.parametrize("replicated", [ + "", + "replicated" +]) +def test_mutation_simple(started_cluster, replicated): + try: + clickhouse_path = "/var/lib/clickhouse" + name = "test_mutation_simple" + nodes = [node1, node2] if replicated else [node1] + engine = "ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')" if replicated else "MergeTree()" + node_check = nodes[-1] + starting_block = 0 if replicated else 1 + + for node in nodes: + node.query(""" + CREATE TABLE {name} + ( + `a` Int64 + ) + ENGINE = {engine} + ORDER BY tuple() + """.format(engine=engine, name=name)) + + node1.query("INSERT INTO {name} VALUES (1)".format(name=name)) + part = "all_{}_{}_0".format(starting_block, starting_block) + result_part = "all_{}_{}_0_{}".format(starting_block, starting_block, starting_block+1) + + def alter(): + node1.query("ALTER TABLE {name} UPDATE a = 42 WHERE sleep(2) OR 1".format(name=name)) + + t = threading.Thread(target=alter) + t.start() + + time.sleep(1) + assert split_tsv(node_check.query(""" + SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation + FROM system.merges + WHERE table = '{name}' + """.format(name=name))) == [ + [ + "default", + name, + "1", + "['{}']".format(part), + "['{clickhouse}/data/default/{name}/{}/']".format(part, clickhouse=clickhouse_path, name=name), + result_part, + "{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name), + "all", + "1" + ], + ] + t.join() + + time.sleep(1.5) + + assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == "" + + finally: + for node in nodes: + node.query("DROP TABLE {name}".format(name=name))