Merge pull request #8043 from excitoon-favorites/systemmergespaths

Added information about paths to `system.merges`.
This commit is contained in:
alexey-milovidov 2019-12-08 23:30:39 +03:00 committed by GitHub
commit 16754bfff5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 220 additions and 2 deletions

View File

@ -17,6 +17,7 @@ namespace DB
MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
: database{database_}, table{table_}, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
, result_part_path{future_part.path}
, result_data_version{future_part.part_info.getDataVersion()}
, num_parts{future_part.parts.size()}
, thread_number{getThreadNumber()}
@ -24,6 +25,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
for (const auto & source_part : future_part.parts)
{
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getFullPath());
std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);
@ -54,6 +56,7 @@ MergeInfo MergeListElement::getInfo() const
res.database = database;
res.table = table;
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.partition_id = partition_id;
res.is_mutation = is_mutation;
res.elapsed = watch.elapsedSeconds();
@ -73,6 +76,9 @@ MergeInfo MergeListElement::getInfo() const
for (const auto & source_part_name : source_part_names)
res.source_part_names.emplace_back(source_part_name);
for (const auto & source_part_path : source_part_paths)
res.source_part_paths.emplace_back(source_part_path);
return res;
}

View File

@ -28,7 +28,9 @@ struct MergeInfo
std::string database;
std::string table;
std::string result_part_name;
std::string result_part_path;
Array source_part_names;
Array source_part_paths;
std::string partition_id;
bool is_mutation;
Float64 elapsed;
@ -55,11 +57,13 @@ struct MergeListElement : boost::noncopyable
std::string partition_id;
const std::string result_part_name;
const std::string result_part_path;
Int64 result_data_version{};
bool is_mutation{};
UInt64 num_parts{};
Names source_part_names;
Names source_part_paths;
Int64 source_data_version{};
Stopwatch watch;

View File

@ -120,6 +120,11 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
name = part_info.getPartName();
}
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const DiskSpace::ReservationPtr & reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
}
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size_)
: data(data_), background_pool_size(background_pool_size_), log(&Logger::get(data.getLogName() + " (MergerMutator)"))
{

View File

@ -17,6 +17,7 @@ class MergeProgressCallback;
struct FutureMergedMutatedPart
{
String name;
String path;
MergeTreePartInfo part_info;
MergeTreeData::DataPartsVector parts;
@ -29,6 +30,7 @@ struct FutureMergedMutatedPart
}
void assign(MergeTreeData::DataPartsVector parts_);
void updatePath(const MergeTreeData & storage, const DiskSpace::ReservationPtr & reservation);
};

View File

@ -581,7 +581,6 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
void MergeTreeDataPart::loadIndexGranularity()
{
String full_path = getFullPath();
index_granularity_info.changeGranularityIfRequired(full_path);

View File

@ -23,6 +23,7 @@ namespace DB
struct ColumnSize;
class MergeTreeData;
struct FutureMergedMutatedPart;
/// Description of the data part.

View File

@ -343,7 +343,7 @@ struct CurrentlyMergingPartsTagger
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(const FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
@ -361,6 +361,8 @@ public:
throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE);
}
future_part_.updatePath(storage, reserved_space);
for (const auto & part : future_part.parts)
{
if (storage.currently_merging_mutating_parts.count(part))

View File

@ -1016,6 +1016,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: "
+ backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME);
}
future_merged_part.updatePath(*this, reserved_space);
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_merged_part);
@ -1156,6 +1157,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part.parts.push_back(source_part);
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = entry.new_part_name;
future_mutated_part.updatePath(*this, reserved_space);
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(
database_name, table_name, future_mutated_part);

View File

@ -16,6 +16,8 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes()
{"num_parts", std::make_shared<DataTypeUInt64>()},
{"source_part_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"result_part_name", std::make_shared<DataTypeString>()},
{"source_part_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"result_part_path", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"is_mutation", std::make_shared<DataTypeUInt8>()},
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
@ -45,6 +47,8 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context &
res_columns[i++]->insert(merge.num_parts);
res_columns[i++]->insert(merge.source_part_names);
res_columns[i++]->insert(merge.result_part_name);
res_columns[i++]->insert(merge.source_part_paths);
res_columns[i++]->insert(merge.result_part_path);
res_columns[i++]->insert(merge.partition_id);
res_columns[i++]->insert(merge.is_mutation);
res_columns[i++]->insert(merge.total_size_bytes_compressed);

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,17 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -0,0 +1,160 @@
import pytest
import threading
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
macros={"shard": 0, "replica": 1} )
node2 = cluster.add_instance('node2',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
macros={"shard": 0, "replica": 2} )
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def split_tsv(data):
return [ x.split("\t") for x in data.splitlines() ]
@pytest.mark.parametrize("replicated", [
"",
"replicated"
])
def test_merge_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
name = "test_merge_simple"
nodes = [node1, node2] if replicated else [node1]
engine = "ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')" if replicated else "MergeTree()"
node_check = nodes[-1]
starting_block = 0 if replicated else 1
for node in nodes:
node.query("""
CREATE TABLE {name}
(
`a` Int64
)
ENGINE = {engine}
ORDER BY sleep(2)
""".format(engine=engine, name=name))
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
node1.query("INSERT INTO {name} VALUES (3)".format(name=name))
parts = ["all_{}_{}_0".format(x, x) for x in range(starting_block, starting_block+3)]
result_part = "all_{}_{}_1".format(starting_block, starting_block+2)
def optimize():
node1.query("OPTIMIZE TABLE {name}".format(name=name))
wait = threading.Thread(target=time.sleep, args=(5,))
wait.start()
t = threading.Thread(target=optimize)
t.start()
time.sleep(1)
assert split_tsv(node_check.query("""
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(name=name))) == [
[
"default",
name,
"3",
"['{}','{}','{}']".format(*parts),
"['{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/']".format(*parts, clickhouse=clickhouse_path, name=name),
result_part,
"{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name),
"all",
"0"
]
]
t.join()
wait.join()
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == ""
finally:
for node in nodes:
node.query("DROP TABLE {name}".format(name=name))
@pytest.mark.parametrize("replicated", [
"",
"replicated"
])
def test_mutation_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
name = "test_mutation_simple"
nodes = [node1, node2] if replicated else [node1]
engine = "ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')" if replicated else "MergeTree()"
node_check = nodes[-1]
starting_block = 0 if replicated else 1
for node in nodes:
node.query("""
CREATE TABLE {name}
(
`a` Int64
)
ENGINE = {engine}
ORDER BY tuple()
""".format(engine=engine, name=name))
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
part = "all_{}_{}_0".format(starting_block, starting_block)
result_part = "all_{}_{}_0_{}".format(starting_block, starting_block, starting_block+1)
def alter():
node1.query("ALTER TABLE {name} UPDATE a = 42 WHERE sleep(2) OR 1".format(name=name))
t = threading.Thread(target=alter)
t.start()
time.sleep(1)
assert split_tsv(node_check.query("""
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(name=name))) == [
[
"default",
name,
"1",
"['{}']".format(part),
"['{clickhouse}/data/default/{name}/{}/']".format(part, clickhouse=clickhouse_path, name=name),
result_part,
"{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name),
"all",
"1"
],
]
t.join()
time.sleep(1.5)
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == ""
finally:
for node in nodes:
node.query("DROP TABLE {name}".format(name=name))