mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Added test_multiple_disk::test_kill_while_insert
, reworked log message and fixed logic of treating stale copies.
This commit is contained in:
parent
a2f238d8da
commit
6afd8d7805
@ -811,10 +811,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
Poco::Path marker_path(part_path, DELETE_ON_DESTROY_MARKER_PATH);
|
||||
if (Poco::File(marker_path).exists())
|
||||
{
|
||||
LOG_WARNING(log, "Detaching stale part " << getFullPathOnDisk(part_disk_ptr) << part_name << ", which should have been deleted after a move. That can only happen after unclean restart of ClickHouse.");
|
||||
LOG_WARNING(log, "Detaching stale part " << getFullPathOnDisk(part_disk_ptr) << part_name << ", which should have been deleted after a move. That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.");
|
||||
std::lock_guard loading_lock(mutex);
|
||||
broken_parts_to_detach.push_back(part);
|
||||
++suspicious_broken_parts;
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
|
@ -628,11 +628,11 @@ class ClickHouseInstance:
|
||||
def http_query(self, sql, data=None):
|
||||
return urllib.urlopen("http://" + self.ip_address + ":8123/?query=" + urllib.quote(sql, safe=''), data).read()
|
||||
|
||||
def restart_clickhouse(self, stop_start_wait_sec=5):
|
||||
def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
|
||||
if not self.stay_alive:
|
||||
raise Exception("clickhouse can be restarted only with stay_alive=True instance")
|
||||
|
||||
self.exec_in_container(["bash", "-c", "pkill clickhouse"], user='root')
|
||||
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
|
||||
time.sleep(stop_start_wait_sec)
|
||||
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
|
||||
|
||||
|
@ -3,6 +3,7 @@ import pytest
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
from multiprocessing.dummy import Pool
|
||||
from helpers.client import QueryRuntimeException
|
||||
@ -15,6 +16,7 @@ node1 = cluster.add_instance('node1',
|
||||
config_dir='configs',
|
||||
main_configs=['configs/logs_config.xml'],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
|
||||
macros={"shard": 0, "replica": 1} )
|
||||
|
||||
@ -22,6 +24,7 @@ node2 = cluster.add_instance('node2',
|
||||
config_dir='configs',
|
||||
main_configs=['configs/logs_config.xml'],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
|
||||
macros={"shard": 0, "replica": 2} )
|
||||
|
||||
@ -1028,6 +1031,7 @@ def test_rename(start_cluster):
|
||||
node1.query("DROP TABLE IF EXISTS default.renaming_table1")
|
||||
node1.query("DROP TABLE IF EXISTS test.renaming_table2")
|
||||
|
||||
|
||||
def test_freeze(start_cluster):
|
||||
try:
|
||||
node1.query("""
|
||||
@ -1057,6 +1061,50 @@ def test_freeze(start_cluster):
|
||||
node1.exec_in_container(["bash", "-c", "find /jbod1/shadow -name '*.mrk2' | grep '.*'"])
|
||||
node1.exec_in_container(["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"])
|
||||
|
||||
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS default.freezing_table")
|
||||
|
||||
|
||||
def test_kill_while_insert(start_cluster):
|
||||
try:
|
||||
name = "test_kill_while_merge"
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
||||
|
||||
node1.query("""
|
||||
CREATE TABLE {name} (
|
||||
s String
|
||||
) ENGINE = MergeTree
|
||||
ORDER BY tuple()
|
||||
SETTINGS storage_policy='small_jbod_with_external'
|
||||
""".format(name=name))
|
||||
|
||||
data = []
|
||||
dates = []
|
||||
for i in range(10):
|
||||
data.append(get_random_string(1024 * 1024)) # 1MB value
|
||||
node1.query("INSERT INTO {name} VALUES {}".format(','.join(["('" + s + "')" for s in data]), name=name))
|
||||
|
||||
disks = get_used_disks_for_table(node1, name)
|
||||
assert set(disks) == {"jbod1"}
|
||||
|
||||
start_time = time.time()
|
||||
long_select = threading.Thread(target=node1.query, args=("SELECT sleep(3) FROM {name}".format(name=name),))
|
||||
long_select.start()
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
node1.query("ALTER TABLE {name} MOVE PARTITION tuple() TO DISK 'external'".format(name=name))
|
||||
assert time.time() - start_time < 2
|
||||
node1.restart_clickhouse(kill=True)
|
||||
|
||||
try:
|
||||
long_select.join()
|
||||
except:
|
||||
""""""
|
||||
|
||||
time.sleep(0.5)
|
||||
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["10"]
|
||||
|
||||
finally:
|
||||
"""Don't drop table afterwards to not shadow assertion."""
|
||||
|
Loading…
Reference in New Issue
Block a user