Merge branch 'master' into add-test-43358

This commit is contained in:
Alexey Milovidov 2023-07-05 02:29:44 +03:00 committed by GitHub
commit 431b53a3ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 108 additions and 57 deletions

View File

@ -1,10 +1,11 @@
#include <base/defines.h>
#include <Core/SettingsQuirks.h>
#include <Core/Settings.h>
#include <Poco/Environment.h>
#include <Poco/Platform.h>
#include <Common/VersionNumber.h>
#include <Common/logger_useful.h>
#include <cstdlib>
namespace
{
@ -71,6 +72,12 @@ void applySettingsQuirks(Settings & settings, Poco::Logger * log)
}
}
#if defined(THREAD_SANITIZER)
settings.use_hedged_requests.value = false;
if (log)
LOG_WARNING(log, "use_hedged_requests has been disabled for the build with Thread Sanitizer, because they are using fibers, leading to a failed assertion inside TSan");
#endif
if (!queryProfilerWorks())
{
if (settings.query_profiler_real_time_period_ns)

View File

@ -4529,9 +4529,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
}
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock &)
{
auto lock = lockParts();
for (auto original_active_part : getDataPartsStateRange(DataPartState::Active)) // NOLINT (copy is intended)
{
if (part_copy->name == original_active_part->name)
@ -4587,6 +4586,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
return getActiveContainingPart(part_info);
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name, DataPartsLock & lock) const
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
return getActiveContainingPart(part_info, DataPartState::Active, lock);
}
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const
{
return getVisibleDataPartsVectorInPartition(local_context->getCurrentTransaction().get(), partition_id);

View File

@ -504,12 +504,13 @@ public:
/// Returns a part in Active state with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name) const;
DataPartPtr getActiveContainingPart(const String & part_name, DataPartsLock & lock) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
/// Swap part with it's identical copy (possible with another path on another disk).
/// If original part is not active or doesn't exist exception will be thrown.
void swapActivePart(MergeTreeData::DataPartPtr part_copy);
void swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock &);
/// Returns all parts in specified partition
DataPartsVector getVisibleDataPartsVectorInPartition(MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;

View File

@ -263,7 +263,10 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
auto active_part = data->getActiveContainingPart(cloned_part.part->name);
/// `getActiveContainingPart` and `swapActivePart` are called under the same lock
/// to prevent part becoming inactive between calls
auto part_lock = data->lockParts();
auto active_part = data->getActiveContainingPart(cloned_part.part->name, part_lock);
/// It's ok, because we don't block moving parts for merges or mutations
if (!active_part || active_part->name != cloned_part.part->name)
@ -284,7 +287,7 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons
cloned_part.part->renameTo(active_part->name, false);
/// TODO what happen if server goes down here?
data->swapActivePart(cloned_part.part);
data->swapActivePart(cloned_part.part, part_lock);
LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath());

View File

@ -5,4 +5,5 @@
<!-- Default is 60 seconds, but let's make tests more aggressive -->
<merge_tree_clear_old_temporary_directories_interval_seconds>5</merge_tree_clear_old_temporary_directories_interval_seconds>
</merge_tree>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -203,6 +203,9 @@ def update_configs(
def test_stuck_replica(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs()
cluster.pause_container("node_1")
@ -233,6 +236,9 @@ def test_stuck_replica(started_cluster):
def test_long_query(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs()
# Restart to reset pool states.
@ -249,12 +255,18 @@ def test_long_query(started_cluster):
def test_send_table_status_sleep(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(node_1_sleep_in_send_tables_status=sleep_time)
check_query(expected_replica="node_2")
check_changing_replica_events(1)
def test_send_table_status_sleep2(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_tables_status=sleep_time,
node_2_sleep_in_send_tables_status=sleep_time,
@ -264,12 +276,18 @@ def test_send_table_status_sleep2(started_cluster):
def test_send_data(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(node_1_sleep_in_send_data=sleep_time)
check_query(expected_replica="node_2")
check_changing_replica_events(1)
def test_send_data2(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time
)
@ -278,6 +296,9 @@ def test_send_data2(started_cluster):
def test_combination1(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_tables_status=sleep_time,
node_2_sleep_in_send_data=sleep_time,
@ -287,6 +308,9 @@ def test_combination1(started_cluster):
def test_combination2(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_data=sleep_time,
node_2_sleep_in_send_tables_status=sleep_time,
@ -296,6 +320,9 @@ def test_combination2(started_cluster):
def test_combination3(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_data=sleep_time,
node_2_sleep_in_send_tables_status=1000,
@ -306,6 +333,9 @@ def test_combination3(started_cluster):
def test_combination4(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_tables_status=1000,
node_1_sleep_in_send_data=sleep_time,
@ -317,6 +347,9 @@ def test_combination4(started_cluster):
def test_receive_timeout1(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
# Check the situation when first two replicas get receive timeout
# in establishing connection, but the third replica is ok.
update_configs(
@ -329,6 +362,9 @@ def test_receive_timeout1(started_cluster):
def test_receive_timeout2(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
# Check the situation when first replica get receive timeout
# in packet receiving but there are replicas in process of
# connection establishing.
@ -342,6 +378,9 @@ def test_receive_timeout2(started_cluster):
def test_initial_receive_timeout(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
# Check the situation when replicas don't respond after
# receiving query (so, no packets were send to initiator)
update_configs(
@ -360,6 +399,9 @@ def test_initial_receive_timeout(started_cluster):
def test_async_connect(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs()
NODES["node"].restart_clickhouse()
@ -390,6 +432,9 @@ def test_async_connect(started_cluster):
def test_async_query_sending(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_after_receiving_query=5000,
node_2_sleep_after_receiving_query=5000,

View File

@ -172,6 +172,9 @@ def update_configs(
def test_send_table_status_sleep(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_tables_status=sleep_time,
node_2_sleep_in_send_tables_status=sleep_time,
@ -181,6 +184,9 @@ def test_send_table_status_sleep(started_cluster):
def test_send_data(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time
)
@ -189,6 +195,9 @@ def test_send_data(started_cluster):
def test_combination1(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_tables_status=1000,
node_2_sleep_in_send_tables_status=1000,
@ -199,6 +208,9 @@ def test_combination1(started_cluster):
def test_combination2(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_data=sleep_time,
node_2_sleep_in_send_tables_status=1000,
@ -210,6 +222,9 @@ def test_combination2(started_cluster):
def test_query_with_no_data_to_sample(started_cluster):
if NODES["node"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
update_configs(
node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time
)

View File

@ -58,6 +58,9 @@ def test(started_cluster):
config.format(sleep_in_send_data_ms=1000000),
)
if NODES["node1"].is_built_with_thread_sanitizer():
pytest.skip("Hedged requests don't work under Thread Sanitizer")
attempts = 0
while attempts < 1000:
setting = NODES["node2"].http_query(

View File

@ -1,45 +0,0 @@
<test>
<substitutions>
<substitution>
<name>table_size</name>
<values>
<value>100000000</value>
</values>
</substitution>
</substitutions>
<settings>
<join_algorithm>full_sorting_merge</join_algorithm>
</settings>
<create_query>
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't1_x') % {table_size} AS x,
sipHash64(number, 't1_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<create_query>
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't2_x') % {table_size} AS x,
sipHash64(number, 't2_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<drop_query>DROP TABLE IF EXISTS t1</drop_query>
<drop_query>DROP TABLE IF EXISTS t2</drop_query>
</test>

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --connections_with_failover_max_tries 10 --query "SELECT hostName() FROM remote('128.1.2.3', default.tmp)" 2>&1 | grep -o -P 'Timeout exceeded while connecting to socket|Network is unreachable' | wc -l
$CLICKHOUSE_CLIENT --connections_with_failover_max_tries 10 --query "SELECT hostName() FROM remote('128.1.2.3', default.tmp)" 2>&1 | grep -o -P 'Timeout exceeded while connecting to socket|Network is unreachable|Timeout: connect timed out' | wc -l

View File

@ -1 +1,2 @@
-- Tags: no-tsan
select number from remote('127.0.0.{3|2}', numbers(2)) where number global in (select number from numbers(1)) settings async_socket_for_remote=1, use_hedged_requests = 1, sleep_in_send_data_ms=10, receive_data_timeout_ms=1;

View File

@ -1,10 +1,10 @@
255.255.255.255
HedgedConnectionsFactory: Connection failed at try №1
ConnectionPoolWithFailover: Connection failed at try №1
executeQuery: Code: 519.: All attempts to get table structure failed.
127.2,255.255.255.255
0
HedgedConnectionsFactory: Connection failed at try №1
ConnectionPoolWithFailover: Connection failed at try №1
255.255.255.255,127.2
0
HedgedConnectionsFactory: Connection failed at try №1
HedgedConnectionsFactory: Connection failed at try №1
ConnectionPoolWithFailover: Connection failed at try №1
ConnectionPoolWithFailover: Connection failed at try №1

View File

@ -25,7 +25,7 @@ function execute_query()
# clickhouse-client 2> >(wc -l)
#
# May dump output of "wc -l" after some other programs.
$CLICKHOUSE_CLIENT "${opts[@]}" --query "select * from remote('$hosts', system.one)" 2>"$stderr"
$CLICKHOUSE_CLIENT "${opts[@]}" --query "select * from remote('$hosts', system.one) settings use_hedged_requests=0" 2>"$stderr"
process_log_safe "$stderr"
}
execute_query 255.255.255.255

View File

@ -17,6 +17,8 @@ opts=(
--allow_experimental_parallel_reading_from_replicas 1
--parallel_replicas_for_non_replicated_merge_tree 1
--max_parallel_replicas 3
--use_hedged_requests 0
--cluster_for_parallel_replicas parallel_replicas
--iterations 1
)

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS lc_nullable_string;
CREATE TABLE lc_nullable_string(`c1` LowCardinality(Nullable(String)) DEFAULT CAST(NULL, 'LowCardinality(Nullable(String))'))
ENGINE = Memory;
INSERT INTO lc_nullable_string (c1) FORMAT Values (0);
INSERT INTO lc_nullable_string (c1) Values (1);
SELECT * FROM lc_nullable_string ORDER BY c1;
DROP TABLE lc_nullable_string;