Merge pull request #17311 from ClickHouse/fix_integration_tests

Fix some flaky tests
This commit is contained in:
alesapin 2020-11-25 10:09:27 +03:00 committed by GitHub
commit 3e1b2f515b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 78 additions and 66 deletions

View File

@ -16,7 +16,7 @@ namespace DB
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
@ -54,7 +54,7 @@ Block RabbitMQBlockInputStream::getHeader() const
void RabbitMQBlockInputStream::readPrefixImpl()
{
auto timeout = std::chrono::milliseconds(context.getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
auto timeout = std::chrono::milliseconds(context->getSettingsRef().rabbitmq_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
}
@ -96,7 +96,7 @@ Block RabbitMQBlockInputStream::readImpl()
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);

View File

@ -15,7 +15,7 @@ public:
RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = true);
@ -37,7 +37,7 @@ public:
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
const Context & context;
std::shared_ptr<Context> context;
Names column_names;
const size_t max_block_size;
bool ack_in_suffix;

View File

@ -74,7 +74,6 @@ StorageRabbitMQ::StorageRabbitMQ(
std::unique_ptr<RabbitMQSettings> rabbitmq_settings_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, rabbitmq_context(Context(global_context))
, rabbitmq_settings(std::move(rabbitmq_settings_))
, exchange_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_exchange_name.value))
, format_name(global_context.getMacros()->expand(rabbitmq_settings->rabbitmq_format.value))
@ -114,8 +113,8 @@ StorageRabbitMQ::StorageRabbitMQ(
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
rabbitmq_context.makeQueryContext();
rabbitmq_context = addSettings(rabbitmq_context);
rabbitmq_context = addSettings(global_context);
rabbitmq_context->makeQueryContext();
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
event_handler->updateLoopState(Loop::STOP);
@ -193,16 +192,17 @@ String StorageRabbitMQ::getTableBasedName(String name, const StorageID & table_i
}
Context StorageRabbitMQ::addSettings(Context context) const
std::shared_ptr<Context> StorageRabbitMQ::addSettings(const Context & context) const
{
context.setSetting("input_format_skip_unknown_fields", true);
context.setSetting("input_format_allow_errors_ratio", 0.);
context.setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
auto modified_context = std::make_shared<Context>(context);
modified_context->setSetting("input_format_skip_unknown_fields", true);
modified_context->setSetting("input_format_allow_errors_ratio", 0.);
modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value);
if (!schema_name.empty())
context.setSetting("format_schema", schema_name);
modified_context->setSetting("format_schema", schema_name);
return context;
return modified_context;
}
@ -538,6 +538,7 @@ Pipe StorageRabbitMQ::read(
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = addSettings(context);
auto block_size = getMaxBlockSize();
bool update_channels = false;
@ -581,7 +582,9 @@ Pipe StorageRabbitMQ::read(
looping_task->activateAndSchedule();
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
auto united_pipe = Pipe::unitePipes(std::move(pipes));
united_pipe.addInterpreterContext(modified_context);
return united_pipe;
}
@ -785,7 +788,7 @@ bool StorageRabbitMQ::streamToViews()
insert->table_id = table_id;
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
InterpreterInsertQuery interpreter(insert, *rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -73,7 +73,7 @@ protected:
private:
const Context & global_context;
Context rabbitmq_context;
std::shared_ptr<Context> rabbitmq_context;
std::unique_ptr<RabbitMQSettings> rabbitmq_settings;
const String exchange_name;
@ -135,7 +135,7 @@ private:
static AMQP::ExchangeType defineExchangeType(String exchange_type_);
static String getTableBasedName(String name, const StorageID & table_id);
Context addSettings(Context context) const;
std::shared_ptr<Context> addSettings(const Context & context) const;
size_t getMaxBlockSize() const;
void deactivateTask(BackgroundSchedulePool::TaskHolder & task, bool wait, bool stop_loop);

View File

@ -228,10 +228,3 @@ def test_default_codec_version_update(start_cluster):
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '2_2_2_1'") == "LZ4HC(5)\n"
assert node3.query(
"SELECT default_compression_codec FROM system.parts WHERE table = 'compression_table' and name = '3_3_3_1'") == "LZ4\n"
assert get_compression_codec_byte(node1, "compression_table_multiple", "2_0_0_1") == CODECS_MAPPING['Multiple']
assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "2_0_0_1") == CODECS_MAPPING['LZ4HC']
assert get_compression_codec_byte(node1, "compression_table_multiple", "3_0_0_1") == CODECS_MAPPING['Multiple']
assert get_second_multiple_codec_byte(node1, "compression_table_multiple", "3_0_0_1") == CODECS_MAPPING['LZ4']
assert node1.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"
assert node2.query("SELECT COUNT() FROM compression_table_multiple") == "3\n"

View File

@ -4,6 +4,7 @@ import psycopg2
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
@ -200,26 +201,43 @@ def test_sqlite_odbc_hashed_dictionary(started_cluster):
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(1, 2, 3);' | sqlite3 {}".format(sqlite_db)],
privileged=True, user='root')
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "3\n"
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "1\n" # default
node1.query("SYSTEM RELOAD DICTIONARY sqlite3_odbc_hashed")
first_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
print("First update time", first_update_time)
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3")
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1") # default
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
# Reloaded with new data
print("Second update time", second_update_time)
while first_update_time == second_update_time:
second_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
print("Waiting dictionary to update for the second time")
time.sleep(0.1)
time.sleep(5) # first reload
node1.exec_in_container(["bash", "-c", "echo 'INSERT INTO t2 values(200, 2, 7);' | sqlite3 {}".format(sqlite_db)],
privileged=True, user='root')
# No reload because of invalidate query
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "3\n"
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "1\n" # still default
third_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
print("Third update time", second_update_time)
counter = 0
while third_update_time == second_update_time:
third_update_time = node1.query("SELECT last_successful_update_time FROM system.dictionaries WHERE name = 'sqlite3_odbc_hashed'")
time.sleep(0.1)
if counter > 50:
break
counter += 1
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "3")
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "1") # still default
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t2 values(1, 2, 5);' | sqlite3 {}".format(sqlite_db)],
privileged=True, user='root')
# waiting for reload
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))") == "5\n"
assert node1.query("select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))") == "7\n" # new value
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(1))", "5")
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_hashed', 'Z', toUInt64(200))", "7")
def test_sqlite_odbc_cached_dictionary(started_cluster):
@ -241,18 +259,16 @@ def test_sqlite_odbc_cached_dictionary(started_cluster):
node1.exec_in_container(["bash", "-c", "echo 'REPLACE INTO t3 values(1, 2, 12);' | sqlite3 {}".format(sqlite_db)],
privileged=True, user='root')
time.sleep(5)
assert node1.query("select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))") == "12\n"
assert_eq_with_retry(node1, "select dictGetUInt8('sqlite3_odbc_cached', 'Z', toUInt64(1))", "12")
def test_postgres_odbc_hached_dictionary_with_schema(started_cluster):
conn = get_postgres_conn()
cursor = conn.cursor()
cursor.execute("insert into clickhouse.test_table values(1, 'hello'),(2, 'world')")
time.sleep(5)
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))") == "hello\n"
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))") == "world\n"
node1.query("SYSTEM RELOAD DICTIONARY postgres_odbc_hashed")
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(1))", "hello")
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(2))", "world")
def test_postgres_odbc_hached_dictionary_no_tty_pipe_overflow(started_cluster):
@ -265,7 +281,7 @@ def test_postgres_odbc_hached_dictionary_no_tty_pipe_overflow(started_cluster):
except Exception as ex:
assert False, "Exception occured -- odbc-bridge hangs: " + str(ex)
assert node1.query("select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))") == "xxx\n"
assert_eq_with_retry(node1, "select dictGetString('postgres_odbc_hashed', 'column2', toUInt64(3))", "xxx")
def test_postgres_insert(started_cluster):
@ -310,7 +326,7 @@ def test_bridge_dies_with_parent(started_cluster):
clickhouse_pid = node1.get_process_pid("clickhouse server")
time.sleep(1)
for i in range(5):
for i in range(30):
time.sleep(1) # just for sure, that odbc-bridge caught signal
bridge_pid = node1.get_process_pid("odbc-bridge")
if bridge_pid is None:

View File

@ -123,7 +123,7 @@ def rabbitmq_setup_teardown():
# Tests
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_select(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -159,7 +159,7 @@ def test_rabbitmq_select(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_select_empty(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -173,7 +173,7 @@ def test_rabbitmq_select_empty(rabbitmq_cluster):
assert int(instance.query('SELECT count() FROM test.rabbitmq')) == 0
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -215,7 +215,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -250,7 +250,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
@ -285,7 +285,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_materialized_view(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -328,7 +328,7 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -371,7 +371,7 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster):
rabbitmq_check_result(result, True)
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view1;
@ -426,7 +426,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
@pytest.mark.skip(reason="clichouse_path with rabbitmq.proto fails to be exported")
@pytest.mark.timeout(180)
@pytest.mark.timeout(240)
def test_rabbitmq_protobuf(rabbitmq_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;

View File

@ -35,7 +35,7 @@ def test_ttl_columns(started_cluster):
node.query(
'''
CREATE TABLE test_ttl(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_columns', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0, min_bytes_for_wide_part=0;
'''.format(replica=node.name))
@ -155,7 +155,7 @@ def test_modify_ttl(started_cluster):
node.query(
'''
CREATE TABLE test_ttl(d DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_modify', '{replica}')
ORDER BY id
'''.format(replica=node.name))
@ -179,7 +179,7 @@ def test_modify_column_ttl(started_cluster):
node.query(
'''
CREATE TABLE test_ttl(d DateTime, id UInt32 DEFAULT 42)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_column', '{replica}')
ORDER BY d
'''.format(replica=node.name))
@ -202,7 +202,7 @@ def test_ttl_double_delete_rule_returns_error(started_cluster):
try:
node1.query('''
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_double_delete', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY, date + INTERVAL 2 DAY SETTINGS merge_with_ttl_timeout=0
'''.format(replica=node1.name))
@ -288,7 +288,7 @@ def test_ttl_empty_parts(started_cluster):
node.query(
'''
CREATE TABLE test_ttl_empty_parts(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_empty_parts', '{replica}')
ORDER BY id
SETTINGS max_bytes_to_merge_at_min_space_in_pool = 1, max_bytes_to_merge_at_max_space_in_pool = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0

View File

@ -7,11 +7,11 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
function wait_mutation_to_start()
{
query_wait=$($CLICKHOUSE_CLIENT --query="SELECT length(parts_to_do_names) FROM system.mutations where table = '$1'" 2>&1)
query_wait=$($CLICKHOUSE_CLIENT --query="SELECT length(parts_to_do_names) FROM system.mutations where table = '$1' and database='${CLICKHOUSE_DATABASE}'" 2>&1)
while [ "$query_wait" == "0" ] || [ -z "$query_wait" ]
do
query_wait=$($CLICKHOUSE_CLIENT --query="SELECT length(parts_to_do_names) FROM system.mutations where table = '$1'" 2>&1)
query_wait=$($CLICKHOUSE_CLIENT --query="SELECT length(parts_to_do_names) FROM system.mutations where table = '$1' and database='${CLICKHOUSE_DATABASE}'" 2>&1)
sleep 0.5
done
}
@ -20,7 +20,7 @@ ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS table_for_mutations"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE table_for_mutations(k UInt32, v1 UInt64) ENGINE MergeTree ORDER BY k PARTITION BY modulo(k, 2)"
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP MERGES"
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP MERGES table_for_mutations"
${CLICKHOUSE_CLIENT} --query="INSERT INTO table_for_mutations select number, number from numbers(100000)"
@ -32,7 +32,7 @@ ${CLICKHOUSE_CLIENT} --query="SELECT is_done, parts_to_do_names, parts_to_do FRO
wait_mutation_to_start "table_for_mutations"
${CLICKHOUSE_CLIENT} --query="SYSTEM START MERGES"
${CLICKHOUSE_CLIENT} --query="SYSTEM START MERGES table_for_mutations"
wait_for_mutation "table_for_mutations" "mutation_3.txt"
@ -47,7 +47,7 @@ ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS replicated_table_for_mutation
${CLICKHOUSE_CLIENT} --query="CREATE TABLE replicated_table_for_mutations(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/test_01045/replicated_table_for_mutations', '1') ORDER BY k PARTITION BY modulo(k, 2)"
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP MERGES"
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP MERGES replicated_table_for_mutations"
${CLICKHOUSE_CLIENT} --query="INSERT INTO replicated_table_for_mutations select number, number from numbers(100000)"
@ -59,7 +59,7 @@ wait_mutation_to_start "replicated_table_for_mutations"
${CLICKHOUSE_CLIENT} --query="SELECT is_done, parts_to_do_names, parts_to_do FROM system.mutations where table = 'replicated_table_for_mutations'"
${CLICKHOUSE_CLIENT} --query="SYSTEM START MERGES"
${CLICKHOUSE_CLIENT} --query="SYSTEM START MERGES replicated_table_for_mutations"
wait_for_mutation "replicated_table_for_mutations" "0000000000"

View File

@ -1,7 +1,7 @@
DROP TABLE IF EXISTS mt;
CREATE TABLE mt (v UInt8) ENGINE = MergeTree() order by tuple();
SYSTEM STOP MERGES;
SYSTEM STOP MERGES mt;
INSERT INTO mt VALUES (0);
INSERT INTO mt VALUES (1);
@ -32,7 +32,7 @@ ALTER TABLE mt ATTACH PART 'all_4_4_0'; -- { serverError 233 }
SELECT v FROM mt ORDER BY v;
SELECT '-- resume merges --';
SYSTEM START MERGES;
SYSTEM START MERGES mt;
OPTIMIZE TABLE mt FINAL;
SELECT v FROM mt ORDER BY v;