fix some tests

This commit is contained in:
Alexander Tokmakov 2020-01-28 22:39:52 +03:00
parent 9d0ac7aae1
commit b82693cb07
10 changed files with 73 additions and 58 deletions

View File

@ -135,9 +135,11 @@ void DatabaseAtomic::dropTable(const Context & context, const String & table_nam
{
LOG_INFO(log, "Mark table " + table->getStorageID().getNameForLogs() + " to drop.");
/// Context:getPath acquires lock
auto data_path = context.getPath() + table_data_path_relative;
std::lock_guard lock(tables_to_drop_mutex);
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
tables_to_drop.push_back({table, context.getPath() + table_data_path_relative, current_time});
tables_to_drop.push_back({table, data_path, current_time});
}
}
catch (...)

View File

@ -351,7 +351,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const Context & context,
const std::string object_name = file_name.substr(0, file_name.size() - strlen(tmp_drop_ext));
if (Poco::File(context.getPath() + getDataPath() + '/' + object_name).exists())
{
Poco::File(getMetadataPath() + file_name).renameTo(context.getPath() + getMetadataPath() + object_name + ".sql");
Poco::File(getMetadataPath() + file_name).renameTo(getMetadataPath() + object_name + ".sql");
LOG_WARNING(log, "Object " << backQuote(object_name) << " was not dropped previously and will be restored");
process_metadata_file(object_name + ".sql");
}

View File

@ -402,6 +402,8 @@ public:
std::chrono::steady_clock::duration closeSessions() const;
/// For methods below you may need to acquire a lock by yourself.
/// NOTE: It's dangerous. While holding Context lock you should not take any other locks or call methods, which may take any locks,
/// until you are sure that all locks are always taken in right order.
std::unique_lock<std::recursive_mutex> getLock() const;
const Context & getQueryContext() const;

View File

@ -222,6 +222,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(String & database_name, ASTDropQ
executeToDictionary(database_name, current_dictionary, kind, false, false, false);
}
//FIXME get rid of global context lock here
auto context_lock = context.getLock();
/// Someone could have time to delete the database before us.
@ -231,14 +232,14 @@ BlockIO InterpreterDropQuery::executeToDatabase(String & database_name, ASTDropQ
if (!context.getDatabase(database_name)->empty(context))
throw Exception("New table appeared in database being dropped. Try dropping it again.", ErrorCodes::DATABASE_NOT_EMPTY);
/// Delete database information from the RAM
context.detachDatabase(database_name);
database->shutdown();
/// Delete the database.
database->drop(context);
/// Delete database information from the RAM
context.detachDatabase(database_name);
/// Old ClickHouse versions did not store database.sql files
Poco::File database_metadata_file(context.getPath() + "metadata/" + escapeForFileName(database_name) + ".sql");
if (database_metadata_file.exists())

View File

@ -13,7 +13,7 @@ node1 = cluster.add_instance('node1', config_dir="configs", with_zookeeper=True)
def start_cluster():
try:
cluster.start()
node1.query("CREATE DATABASE zktest;")
node1.query("CREATE DATABASE zktest ENGINE=Ordinary;")
node1.query(
'''
CREATE TABLE zktest.atomic_drop_table (n UInt32)

View File

@ -16,6 +16,7 @@ node = cluster.add_instance('node', config_dir="configs", main_configs=['configs
def started_cluster():
try:
cluster.start()
node.query("create database test engine=Ordinary")
yield cluster
finally:
@ -23,37 +24,37 @@ def started_cluster():
def test_single_file(started_cluster):
node.query("create table distr_1 (x UInt64, s String) engine = Distributed('test_cluster', database, table)")
node.query("insert into distr_1 values (1, 'a'), (2, 'bb'), (3, 'ccc')")
node.query("create table test.distr_1 (x UInt64, s String) engine = Distributed('test_cluster', database, table)")
node.query("insert into test.distr_1 values (1, 'a'), (2, 'bb'), (3, 'ccc')")
query = "select * from file('/var/lib/clickhouse/data/default/distr_1/default@not_existing:9000/1.bin', 'Distributed')"
query = "select * from file('/var/lib/clickhouse/data/test/distr_1/default@not_existing:9000/1.bin', 'Distributed')"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/default/distr_1/default@not_existing:9000/1.bin');" \
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/default@not_existing:9000/1.bin');" \
"select * from t"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '1\ta\n2\tbb\n3\tccc\n'
node.query("drop table distr_1")
node.query("drop table test.distr_1")
def test_two_files(started_cluster):
node.query("create table distr_2 (x UInt64, s String) engine = Distributed('test_cluster', database, table)")
node.query("insert into distr_2 values (0, '_'), (1, 'a')")
node.query("insert into distr_2 values (2, 'bb'), (3, 'ccc')")
node.query("create table test.distr_2 (x UInt64, s String) engine = Distributed('test_cluster', database, table)")
node.query("insert into test.distr_2 values (0, '_'), (1, 'a')")
node.query("insert into test.distr_2 values (2, 'bb'), (3, 'ccc')")
query = "select * from file('/var/lib/clickhouse/data/default/distr_2/default@not_existing:9000/{1,2,3,4}.bin', 'Distributed') order by x"
query = "select * from file('/var/lib/clickhouse/data/test/distr_2/default@not_existing:9000/{1,2,3,4}.bin', 'Distributed') order by x"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/default/distr_2/default@not_existing:9000/{1,2,3,4}.bin');" \
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/default@not_existing:9000/{1,2,3,4}.bin');" \
"select * from t order by x"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
node.query("drop table distr_2")
node.query("drop table test.distr_2")

View File

@ -16,6 +16,7 @@ node = cluster.add_instance('node',
def start_cluster():
try:
cluster.start()
node.query('CREATE DATABASE test ENGINE=Ordinary')
yield cluster
finally:
cluster.shutdown()
@ -25,41 +26,41 @@ def _files_in_dist_mon(node, root, table):
'bash',
'-c',
# `-maxdepth 1` to avoid /tmp/ subdirectory
'find /{root}/data/default/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f | wc -l'.format(root=root, table=table)
'find /{root}/data/test/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f | wc -l'.format(root=root, table=table)
]).split('\n')[0])
def test_different_versions(start_cluster):
node.query('CREATE TABLE foo (key Int) Engine=Memory()')
node.query('CREATE TABLE test.foo (key Int) Engine=Memory()')
node.query("""
CREATE TABLE dist_foo (key Int)
CREATE TABLE test.dist_foo (key Int)
Engine=Distributed(
test_cluster_two_shards,
currentDatabase(),
test,
foo,
key%2,
'default'
)
""")
# manual only
node.query('SYSTEM STOP DISTRIBUTED SENDS dist_foo')
node.query('SYSTEM STOP DISTRIBUTED SENDS test.dist_foo')
node.query('INSERT INTO dist_foo SELECT * FROM numbers(100)')
node.query('INSERT INTO test.dist_foo SELECT * FROM numbers(100)')
assert _files_in_dist_mon(node, 'disk1', 'dist_foo') == 1
assert _files_in_dist_mon(node, 'disk2', 'dist_foo') == 0
assert node.query('SELECT count() FROM dist_foo') == '100\n'
node.query('SYSTEM FLUSH DISTRIBUTED dist_foo')
assert node.query('SELECT count() FROM dist_foo') == '200\n'
assert node.query('SELECT count() FROM test.dist_foo') == '100\n'
node.query('SYSTEM FLUSH DISTRIBUTED test.dist_foo')
assert node.query('SELECT count() FROM test.dist_foo') == '200\n'
#
# RENAME
#
node.query('RENAME TABLE dist_foo TO dist2_foo')
node.query('RENAME TABLE test.dist_foo TO test.dist2_foo')
node.query('INSERT INTO dist2_foo SELECT * FROM numbers(100)')
node.query('INSERT INTO test.dist2_foo SELECT * FROM numbers(100)')
assert _files_in_dist_mon(node, 'disk1', 'dist2_foo') == 0
assert _files_in_dist_mon(node, 'disk2', 'dist2_foo') == 1
assert node.query('SELECT count() FROM dist2_foo') == '300\n'
node.query('SYSTEM FLUSH DISTRIBUTED dist2_foo')
assert node.query('SELECT count() FROM dist2_foo') == '400\n'
assert node.query('SELECT count() FROM test.dist2_foo') == '300\n'
node.query('SYSTEM FLUSH DISTRIBUTED test.dist2_foo')
assert node.query('SELECT count() FROM test.dist2_foo') == '400\n'

View File

@ -22,6 +22,8 @@ node2 = cluster.add_instance('node2',
def started_cluster():
try:
cluster.start()
node1.query('CREATE DATABASE test ENGINE=Ordinary')
node2.query('CREATE DATABASE test ENGINE=Ordinary')
yield cluster
finally:
@ -39,7 +41,10 @@ def split_tsv(data):
def test_merge_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
name = "test_merge_simple"
db_name = "test"
table_name = "merge_simple"
name = db_name + "." + table_name
table_path = "data/" + db_name + "/" + table_name
nodes = [node1, node2] if replicated else [node1]
engine = "ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')" if replicated else "MergeTree()"
node_check = nodes[-1]
@ -75,15 +80,15 @@ def test_merge_simple(started_cluster, replicated):
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(name=name))) == [
""".format(name=table_name))) == [
[
"default",
name,
db_name,
table_name,
"3",
"['{}','{}','{}']".format(*parts),
"['{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/','{clickhouse}/data/default/{name}/{}/']".format(*parts, clickhouse=clickhouse_path, name=name),
"['{clickhouse}/{table_path}/{}/','{clickhouse}/{table_path}/{}/','{clickhouse}/{table_path}/{}/']".format(*parts, clickhouse=clickhouse_path, table_path=table_path),
result_part,
"{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name),
"{clickhouse}/{table_path}/{}/".format(result_part, clickhouse=clickhouse_path, table_path=table_path),
"all",
"0"
]
@ -91,7 +96,7 @@ def test_merge_simple(started_cluster, replicated):
t.join()
wait.join()
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == ""
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=table_name)) == ""
finally:
for node in nodes:
@ -105,7 +110,10 @@ def test_merge_simple(started_cluster, replicated):
def test_mutation_simple(started_cluster, replicated):
try:
clickhouse_path = "/var/lib/clickhouse"
name = "test_mutation_simple"
db_name = "test"
table_name = "mutation_simple"
name = db_name + "." + table_name
table_path = "data/" + db_name + "/" + table_name
nodes = [node1, node2] if replicated else [node1]
engine = "ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')" if replicated else "MergeTree()"
node_check = nodes[-1]
@ -136,15 +144,15 @@ def test_mutation_simple(started_cluster, replicated):
SELECT database, table, num_parts, source_part_names, source_part_paths, result_part_name, result_part_path, partition_id, is_mutation
FROM system.merges
WHERE table = '{name}'
""".format(name=name))) == [
""".format(name=table_name))) == [
[
"default",
name,
db_name,
table_name,
"1",
"['{}']".format(part),
"['{clickhouse}/data/default/{name}/{}/']".format(part, clickhouse=clickhouse_path, name=name),
"['{clickhouse}/{table_path}/{}/']".format(part, clickhouse=clickhouse_path, table_path=table_path),
result_part,
"{clickhouse}/data/default/{name}/{}/".format(result_part, clickhouse=clickhouse_path, name=name),
"{clickhouse}/{table_path}/{}/".format(result_part, clickhouse=clickhouse_path, table_path=table_path),
"all",
"1"
],
@ -153,7 +161,7 @@ def test_mutation_simple(started_cluster, replicated):
time.sleep(1.5)
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=name)) == ""
assert node_check.query("SELECT * FROM system.merges WHERE table = '{name}'".format(name=table_name)) == ""
finally:
for node in nodes:

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS test.basic;
DROP TABLE IF EXISTS test.basic_00040;
CREATE MATERIALIZED VIEW test.basic
CREATE MATERIALIZED VIEW test.basic_00040
ENGINE = AggregatingMergeTree(StartDate, (CounterID, StartDate), 8192)
POPULATE AS
SELECT
@ -16,7 +16,7 @@ SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
FROM test.basic_00040
GROUP BY StartDate
ORDER BY StartDate;
@ -25,7 +25,7 @@ SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
FROM test.basic_00040
WHERE CounterID = 942285
GROUP BY StartDate
ORDER BY StartDate;
@ -41,4 +41,4 @@ GROUP BY StartDate
ORDER BY StartDate;
DROP TABLE test.basic;
DROP TABLE test.basic_00040;

View File

@ -14,8 +14,8 @@ ENGINE = AggregatingMergeTree(StartDate, (CounterID, StartDate), 8192)
AS SELECT
CounterID,
StartDate,
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
FROM test.visits_null
GROUP BY CounterID, StartDate;
@ -30,8 +30,8 @@ FROM test.visits;
SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
GROUP BY StartDate
ORDER BY StartDate;
@ -39,8 +39,8 @@ ORDER BY StartDate;
SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
WHERE CounterID = 942285
GROUP BY StartDate
@ -49,8 +49,8 @@ ORDER BY StartDate;
SELECT
StartDate,
sum(Sign) AS Visits,
uniq(UserID) AS Users
sum(Sign) AS Visits,
uniq(UserID) AS Users
FROM test.visits
WHERE CounterID = 942285
GROUP BY StartDate