Merge pull request #7810 from excitoon-favorites/deleteondestroy

Ignore redundant copies of parts after move and restart
This commit is contained in:
alesapin 2019-12-10 12:41:30 +03:00 committed by GitHub
commit 650277afb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 4 deletions

View File

@ -96,6 +96,12 @@ namespace ErrorCodes
}
namespace
{
const char * DELETE_ON_DESTROY_MARKER_PATH = "delete-on-destroy.txt";
}
MergeTreeData::MergeTreeData(
const String & database_,
const String & table_,
@ -801,6 +807,17 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
part->relative_path = part_name;
bool broken = false;
Poco::Path part_path(getFullPathOnDisk(part_disk_ptr), part_name);
Poco::Path marker_path(part_path, DELETE_ON_DESTROY_MARKER_PATH);
if (Poco::File(marker_path).exists())
{
LOG_WARNING(log, "Detaching stale part " << getFullPathOnDisk(part_disk_ptr) << part_name << ", which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.");
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
return;
}
try
{
part->loadColumnsChecksumsIndexes(require_part_metadata, true);
@ -2515,7 +2532,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
{
auto lock = lockParts();
for (const auto & original_active_part : getDataPartsStateRange(DataPartState::Committed))
for (auto original_active_part : getDataPartsStateRange(DataPartState::Committed))
{
if (part_copy->name == original_active_part->name)
{
@ -2528,6 +2545,16 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Committed);
Poco::Path marker_path(Poco::Path(original_active_part->getFullPath()), DELETE_ON_DESTROY_MARKER_PATH);
try
{
Poco::File(marker_path).createFile();
}
catch (Poco::Exception & e)
{
LOG_ERROR(log, e.what() << " (while creating DeleteOnDestroy marker: " + backQuote(marker_path.toString()) + ")");
}
return;
}
}

View File

@ -346,6 +346,11 @@ MergeTreeDataPart::~MergeTreeDataPart()
}
dir.remove(true);
if (state == State::DeleteOnDestroy)
{
LOG_TRACE(storage.log, "Removed part from old location " << path);
}
}
catch (...)
{

View File

@ -628,11 +628,11 @@ class ClickHouseInstance:
def http_query(self, sql, data=None):
return urllib.urlopen("http://" + self.ip_address + ":8123/?query=" + urllib.quote(sql, safe=''), data).read()
def restart_clickhouse(self, stop_start_wait_sec=5):
def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
if not self.stay_alive:
raise Exception("clickhouse can be restarted only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "pkill clickhouse"], user='root')
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
time.sleep(stop_start_wait_sec)
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))

View File

@ -3,6 +3,7 @@ import pytest
import random
import re
import string
import threading
import time
from multiprocessing.dummy import Pool
from helpers.client import QueryRuntimeException
@ -15,6 +16,7 @@ node1 = cluster.add_instance('node1',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
stay_alive=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 1} )
@ -22,6 +24,7 @@ node2 = cluster.add_instance('node2',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
stay_alive=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 2} )
@ -1028,6 +1031,7 @@ def test_rename(start_cluster):
node1.query("DROP TABLE IF EXISTS default.renaming_table1")
node1.query("DROP TABLE IF EXISTS test.renaming_table2")
def test_freeze(start_cluster):
try:
node1.query("""
@ -1057,6 +1061,50 @@ def test_freeze(start_cluster):
node1.exec_in_container(["bash", "-c", "find /jbod1/shadow -name '*.mrk2' | grep '.*'"])
node1.exec_in_container(["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"])
finally:
node1.query("DROP TABLE IF EXISTS default.freezing_table")
def test_kill_while_insert(start_cluster):
try:
name = "test_kill_while_insert"
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
node1.query("""
CREATE TABLE {name} (
s String
) ENGINE = MergeTree
ORDER BY tuple()
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name))
data = []
dates = []
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB value
node1.query("INSERT INTO {name} VALUES {}".format(','.join(["('" + s + "')" for s in data]), name=name))
disks = get_used_disks_for_table(node1, name)
assert set(disks) == {"jbod1"}
start_time = time.time()
long_select = threading.Thread(target=node1.query, args=("SELECT sleep(3) FROM {name}".format(name=name),))
long_select.start()
time.sleep(0.5)
node1.query("ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'external'".format(name=name))
assert time.time() - start_time < 2
node1.restart_clickhouse(kill=True)
try:
long_select.join()
except:
""""""
time.sleep(0.5)
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["10"]
finally:
"""Don't drop table afterwards to not shadow assertion."""