mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Merge branch 'master' into shellcheck1
This commit is contained in:
commit
6b9cf59dc0
@ -1323,7 +1323,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
/// Try start processing, create node about it
|
||||
{
|
||||
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
|
||||
CleanStateClock new_clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
|
||||
CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
|
||||
if (clean_state_clock != new_clean_state_clock)
|
||||
{
|
||||
LOG_INFO(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
|
||||
@ -1360,8 +1360,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
|
||||
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
|
||||
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
|
||||
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}",
|
||||
getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
|
||||
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
|
||||
}
|
||||
|
||||
/// Do the copying
|
||||
@ -1393,17 +1392,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
try
|
||||
{
|
||||
/// Custom INSERT SELECT implementation
|
||||
Context context_select = context;
|
||||
context_select.setSettings(task_cluster->settings_pull);
|
||||
|
||||
Context context_insert = context;
|
||||
context_insert.setSettings(task_cluster->settings_push);
|
||||
|
||||
BlockInputStreamPtr input;
|
||||
BlockOutputStreamPtr output;
|
||||
{
|
||||
std::unique_ptr<Context> context_select = std::make_unique<Context>(context);
|
||||
context_select->setSettings(task_cluster->settings_pull);
|
||||
|
||||
std::unique_ptr<Context> context_insert = std::make_unique<Context>(context);
|
||||
context_insert->setSettings(task_cluster->settings_push);
|
||||
|
||||
BlockIO io_select = InterpreterFactory::get(query_select_ast, *context_select)->execute();
|
||||
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, *context_insert)->execute();
|
||||
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
|
||||
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
|
||||
|
||||
input = io_select.getInputStream();
|
||||
output = io_insert.out;
|
||||
|
@ -0,0 +1,10 @@
|
||||
1
|
||||
|
||||
---
|
||||
|
||||
0
|
||||
1
|
||||
|
||||
---
|
||||
|
||||
0
|
43
tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.sh
Executable file
43
tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes.sh
Executable file
@ -0,0 +1,43 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
# Check that if we have one inactive replica and a huge number of INSERTs to active replicas,
|
||||
# the number of nodes in ZooKeeper does not grow unbounded.
|
||||
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
DROP TABLE IF EXISTS r1;
|
||||
DROP TABLE IF EXISTS r2;
|
||||
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', '1') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
|
||||
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/r', '2') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
|
||||
DETACH TABLE r2;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 1 --min_insert_block_size_bytes 1 --max_insert_threads 16 --query "INSERT INTO r1 SELECT * FROM numbers_mt(10000)"
|
||||
|
||||
|
||||
# Now wait for cleanup thread
|
||||
|
||||
for i in {1..60}; do
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
|
||||
[[ $($CLICKHOUSE_CLIENT --query "SELECT sum(toUInt32(extract(message, 'Removed (\d+) old log entries'))) FROM system.text_log WHERE event_date >= yesterday() AND logger_name LIKE '%' || currentDatabase() || '%r1%(ReplicatedMergeTreeCleanupThread)%' AND message LIKE '%Removed % old log entries%'") -gt 9900 ]] && break;
|
||||
sleep 1
|
||||
done
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT numChildren < 2500 FROM system.zookeeper WHERE path = '/clickhouse/tables/r' AND name = 'log'";
|
||||
echo -e '\n---\n';
|
||||
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/r/replicas/1' AND name = 'is_lost'";
|
||||
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/r/replicas/2' AND name = 'is_lost'";
|
||||
echo -e '\n---\n';
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "ATTACH TABLE r2"
|
||||
$CLICKHOUSE_CLIENT --receive_timeout 600 --query "SYSTEM SYNC REPLICA r2" # Need to increase timeout, otherwise it timed out in debug build
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/r/replicas/2' AND name = 'is_lost'";
|
||||
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
DROP TABLE IF EXISTS r1;
|
||||
DROP TABLE IF EXISTS r2;
|
||||
"
|
Loading…
Reference in New Issue
Block a user