2020-12-03 17:08:17 +00:00
# include <filesystem>
2017-04-13 13:42:29 +00:00
# include <Interpreters/DDLWorker.h>
2020-11-03 13:47:26 +00:00
# include <Interpreters/DDLTask.h>
2017-04-13 16:12:56 +00:00
# include <Parsers/ASTAlterQuery.h>
2019-06-26 14:52:20 +00:00
# include <Parsers/ASTDropQuery.h>
# include <Parsers/ASTOptimizeQuery.h>
2017-04-21 12:39:28 +00:00
# include <Parsers/ASTQueryWithOnCluster.h>
2019-06-26 14:52:20 +00:00
# include <Parsers/ASTQueryWithTableAndOutput.h>
2022-07-25 16:00:54 +00:00
# include <Parsers/ASTCreateIndexQuery.h>
# include <Parsers/ASTDropIndexQuery.h>
2017-04-25 15:21:03 +00:00
# include <Parsers/ParserQuery.h>
# include <Parsers/parseQuery.h>
# include <Parsers/queryToString.h>
2017-04-17 17:04:31 +00:00
# include <IO/WriteHelpers.h>
# include <IO/ReadHelpers.h>
# include <IO/ReadBufferFromString.h>
2017-04-25 15:21:03 +00:00
# include <Storages/IStorage.h>
2017-04-13 13:42:29 +00:00
# include <Interpreters/executeQuery.h>
2017-04-13 16:12:56 +00:00
# include <Interpreters/Cluster.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2017-04-27 15:19:11 +00:00
# include <Common/setThreadName.h>
2017-09-09 23:17:38 +00:00
# include <Common/randomSeed.h>
2020-03-12 14:36:54 +00:00
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/ZooKeeper/KeeperException.h>
2022-01-17 11:52:51 +00:00
# include <Common/ZooKeeper/ZooKeeperLock.h>
2020-03-12 14:36:54 +00:00
# include <Common/isLocalAddress.h>
2019-06-26 14:52:20 +00:00
# include <Storages/StorageReplicatedMergeTree.h>
2017-04-19 14:21:27 +00:00
# include <Poco/Timestamp.h>
2021-10-02 07:13:14 +00:00
# include <base/sleep.h>
# include <base/getFQDNOrHostName.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2022-01-30 19:49:48 +00:00
# include <base/sort.h>
2017-09-09 23:17:38 +00:00
# include <random>
# include <pcg_random.hpp>
2022-04-27 15:05:45 +00:00
# include <Common/scope_guard_safe.h>
2017-09-09 23:17:38 +00:00
2021-07-09 14:05:35 +00:00
# include <Interpreters/ZooKeeperLog.h>
2020-12-03 17:04:24 +00:00
namespace fs = std : : filesystem ;
2017-04-13 13:42:29 +00:00
namespace DB
{
namespace ErrorCodes
{
2018-10-25 09:40:30 +00:00
extern const int LOGICAL_ERROR ;
2017-04-27 15:19:11 +00:00
extern const int TIMEOUT_EXCEEDED ;
2017-08-02 14:42:35 +00:00
extern const int UNFINISHED ;
2020-12-04 20:12:32 +00:00
extern const int NOT_A_LEADER ;
2022-04-28 19:39:45 +00:00
extern const int TABLE_IS_READ_ONLY ;
2020-12-04 20:12:32 +00:00
extern const int KEEPER_EXCEPTION ;
extern const int CANNOT_ASSIGN_ALTER ;
extern const int CANNOT_ALLOCATE_MEMORY ;
extern const int MEMORY_LIMIT_EXCEEDED ;
2017-04-13 13:42:29 +00:00
}
2021-03-04 23:17:07 +00:00
constexpr const char * TASK_PROCESSED_OUT_REASON = " Task has been already processed " ;
2017-04-13 13:42:29 +00:00
2020-08-18 19:02:07 +00:00
2021-04-10 23:33:54 +00:00
DDLWorker : : DDLWorker (
int pool_size_ ,
const std : : string & zk_root_dir ,
ContextPtr context_ ,
const Poco : : Util : : AbstractConfiguration * config ,
const String & prefix ,
const String & logger_name ,
2021-08-11 03:40:06 +00:00
const CurrentMetrics : : Metric * max_entry_metric_ ,
const CurrentMetrics : : Metric * max_pushed_entry_metric_ )
2021-04-10 23:33:54 +00:00
: context ( Context : : createCopy ( context_ ) )
2020-11-27 14:04:03 +00:00
, log ( & Poco : : Logger : : get ( logger_name ) )
2020-12-03 18:14:27 +00:00
, pool_size ( pool_size_ )
2021-02-15 10:26:34 +00:00
, max_entry_metric ( max_entry_metric_ )
2021-08-11 03:40:06 +00:00
, max_pushed_entry_metric ( max_pushed_entry_metric_ )
2017-04-13 16:12:56 +00:00
{
2021-02-15 10:26:34 +00:00
if ( max_entry_metric )
CurrentMetrics : : set ( * max_entry_metric , 0 ) ;
2021-01-18 14:09:39 +00:00
2021-08-11 03:40:06 +00:00
if ( max_pushed_entry_metric )
CurrentMetrics : : set ( * max_pushed_entry_metric , 0 ) ;
2020-12-03 18:14:27 +00:00
if ( 1 < pool_size )
{
LOG_WARNING ( log , " DDLWorker is configured to use multiple threads. "
" It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear. " ) ;
2021-01-18 14:09:39 +00:00
worker_pool = std : : make_unique < ThreadPool > ( pool_size ) ;
2020-12-03 18:14:27 +00:00
}
2020-09-02 22:35:47 +00:00
2017-04-27 15:19:11 +00:00
queue_dir = zk_root_dir ;
if ( queue_dir . back ( ) = = ' / ' )
queue_dir . resize ( queue_dir . size ( ) - 1 ) ;
2017-04-13 13:42:29 +00:00
2017-08-01 14:41:00 +00:00
if ( config )
{
2017-08-14 05:44:04 +00:00
task_max_lifetime = config - > getUInt64 ( prefix + " .task_max_lifetime " , static_cast < UInt64 > ( task_max_lifetime ) ) ;
cleanup_delay_period = config - > getUInt64 ( prefix + " .cleanup_delay_period " , static_cast < UInt64 > ( cleanup_delay_period ) ) ;
2019-03-12 12:06:17 +00:00
max_tasks_in_queue = std : : max < UInt64 > ( 1 , config - > getUInt64 ( prefix + " .max_tasks_in_queue " , max_tasks_in_queue ) ) ;
2018-02-01 13:52:29 +00:00
if ( config - > has ( prefix + " .profile " ) )
2021-04-10 23:33:54 +00:00
context - > setSetting ( " profile " , config - > getString ( prefix + " .profile " ) ) ;
2018-02-01 13:52:29 +00:00
}
2021-04-10 23:33:54 +00:00
if ( context - > getSettingsRef ( ) . readonly )
2018-02-01 13:52:29 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries Set appropriate system_profile or distributed_ddl.profile to fix this. " ) ;
2017-08-01 14:41:00 +00:00
}
2017-07-26 19:31:32 +00:00
host_fqdn = getFQDNOrHostName ( ) ;
2021-04-10 23:33:54 +00:00
host_fqdn_id = Cluster : : Address : : toString ( host_fqdn , context - > getTCPPort ( ) ) ;
2020-12-01 17:20:42 +00:00
}
2017-04-13 13:42:29 +00:00
2020-12-01 17:20:42 +00:00
void DDLWorker : : startup ( )
{
2019-02-22 18:36:36 +00:00
main_thread = ThreadFromGlobalPool ( & DDLWorker : : runMainThread , this ) ;
cleanup_thread = ThreadFromGlobalPool ( & DDLWorker : : runCleanupThread , this ) ;
2017-04-13 13:42:29 +00:00
}
2020-11-13 18:35:45 +00:00
void DDLWorker : : shutdown ( )
2017-04-13 13:42:29 +00:00
{
2021-02-19 23:41:58 +00:00
bool prev_stop_flag = stop_flag . exchange ( true ) ;
if ( ! prev_stop_flag )
{
queue_updated_event - > set ( ) ;
cleanup_event - > set ( ) ;
2020-12-01 17:20:42 +00:00
main_thread . join ( ) ;
cleanup_thread . join ( ) ;
2021-02-19 23:41:58 +00:00
worker_pool . reset ( ) ;
}
2017-04-13 13:42:29 +00:00
}
2021-01-25 18:59:23 +00:00
DDLWorker : : ~ DDLWorker ( )
{
2021-02-15 00:04:46 +00:00
DDLWorker : : shutdown ( ) ;
2021-01-25 18:59:23 +00:00
}
2017-04-13 16:12:56 +00:00
2020-11-13 18:35:45 +00:00
ZooKeeperPtr DDLWorker : : tryGetZooKeeper ( ) const
2019-02-22 16:14:48 +00:00
{
std : : lock_guard lock ( zookeeper_mutex ) ;
return current_zookeeper ;
}
2020-11-13 18:35:45 +00:00
ZooKeeperPtr DDLWorker : : getAndSetZooKeeper ( )
2019-02-22 16:14:48 +00:00
{
std : : lock_guard lock ( zookeeper_mutex ) ;
if ( ! current_zookeeper | | current_zookeeper - > expired ( ) )
2021-04-10 23:33:54 +00:00
current_zookeeper = context - > getZooKeeper ( ) ;
2019-02-22 16:14:48 +00:00
return current_zookeeper ;
}
2020-09-02 22:35:47 +00:00
DDLTaskPtr DDLWorker : : initAndCheckTask ( const String & entry_name , String & out_reason , const ZooKeeperPtr & zookeeper )
2017-04-13 13:42:29 +00:00
{
2017-07-28 16:14:49 +00:00
String node_data ;
2020-12-03 17:53:55 +00:00
String entry_path = fs : : path ( queue_dir ) / entry_name ;
2017-08-01 14:41:00 +00:00
2020-11-27 14:04:03 +00:00
auto task = std : : make_unique < DDLTask > ( entry_name , entry_path ) ;
2020-11-19 10:34:45 +00:00
2017-08-01 14:41:00 +00:00
if ( ! zookeeper - > tryGet ( entry_path , node_data ) )
2017-07-28 16:14:49 +00:00
{
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
2017-08-12 20:00:00 +00:00
out_reason = " The task was deleted " ;
2020-09-02 22:35:47 +00:00
return { } ;
2017-07-28 16:14:49 +00:00
}
2017-04-13 16:12:56 +00:00
2020-11-29 11:45:32 +00:00
auto write_error_status = [ & ] ( const String & host_id , const String & error_message , const String & reason )
{
LOG_ERROR ( log , " Cannot parse DDL task {}: {}. Will try to send error status: {} " , entry_name , reason , error_message ) ;
createStatusDirs ( entry_path , zookeeper ) ;
2021-01-18 14:09:39 +00:00
zookeeper - > tryCreate ( fs : : path ( entry_path ) / " finished " / host_id , error_message , zkutil : : CreateMode : : Persistent ) ;
2020-11-29 11:45:32 +00:00
} ;
try
{
/// Stage 1: parse entry
task - > entry . parse ( node_data ) ;
}
catch ( . . . )
2017-07-28 16:14:49 +00:00
{
/// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
2019-01-22 19:56:53 +00:00
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
2022-05-09 19:13:02 +00:00
/// Otherwise, that node will be ignored by DDLQueryStatusSource.
2017-08-12 20:00:00 +00:00
out_reason = " Incorrect task format " ;
2020-11-29 11:45:32 +00:00
write_error_status ( host_fqdn_id , ExecutionStatus : : fromCurrentException ( ) . serializeText ( ) , out_reason ) ;
2020-09-02 22:35:47 +00:00
return { } ;
2017-07-28 16:14:49 +00:00
}
2020-11-29 11:45:32 +00:00
/// Stage 2: resolve host_id and check if we should execute query or not
2021-02-19 23:41:58 +00:00
/// Multiple clusters can use single DDL queue path in ZooKeeper,
/// So we should skip task if we cannot find current host in cluster hosts list.
2020-11-27 14:04:03 +00:00
if ( ! task - > findCurrentHostID ( context , log ) )
2020-09-02 22:35:47 +00:00
{
2017-08-12 20:00:00 +00:00
out_reason = " There is no a local address in host list " ;
2020-09-02 22:35:47 +00:00
return { } ;
}
2017-08-01 14:41:00 +00:00
2020-11-29 11:45:32 +00:00
try
{
/// Stage 3.1: parse query
task - > parseQueryFromEntry ( context ) ;
/// Stage 3.2: check cluster and find the host in cluster
task - > setClusterInfo ( context , log ) ;
}
catch ( . . . )
{
out_reason = " Cannot parse query or obtain cluster info " ;
write_error_status ( task - > host_id_str , ExecutionStatus : : fromCurrentException ( ) . serializeText ( ) , out_reason ) ;
return { } ;
}
2020-11-27 14:04:03 +00:00
2020-12-03 18:14:27 +00:00
if ( zookeeper - > exists ( task - > getFinishedNodePath ( ) ) )
{
2021-03-04 23:17:07 +00:00
out_reason = TASK_PROCESSED_OUT_REASON ;
2020-12-03 18:14:27 +00:00
return { } ;
}
2020-11-29 11:45:32 +00:00
/// Now task is ready for execution
2020-09-02 22:35:47 +00:00
return task ;
2017-07-28 16:14:49 +00:00
}
2017-07-27 11:30:27 +00:00
2017-08-02 14:42:35 +00:00
static void filterAndSortQueueNodes ( Strings & all_nodes )
{
2022-04-16 19:43:14 +00:00
std : : erase_if ( all_nodes , [ ] ( const String & s ) { return ! startsWith ( s , " query- " ) ; } ) ;
2022-01-30 19:49:48 +00:00
: : sort ( all_nodes . begin ( ) , all_nodes . end ( ) ) ;
2017-08-02 14:42:35 +00:00
}
2021-02-23 05:26:46 +00:00
void DDLWorker : : scheduleTasks ( bool reinitialized )
2017-07-28 16:14:49 +00:00
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Scheduling tasks " ) ;
2019-02-22 16:14:48 +00:00
auto zookeeper = tryGetZooKeeper ( ) ;
2017-07-27 11:30:27 +00:00
2021-02-23 05:26:46 +00:00
/// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper.
2021-03-04 23:17:07 +00:00
/// We have some unfinished tasks.
/// To avoid duplication of some queries we should try to write execution status again.
/// To avoid skipping of some entries which were not executed we should be careful when choosing begin_node to start from.
/// NOTE: It does not protect from all cases of query duplication, see also comments in processTask(...)
2021-02-23 05:26:46 +00:00
if ( reinitialized )
2020-12-04 20:12:32 +00:00
{
2021-03-04 23:17:07 +00:00
if ( current_tasks . empty ( ) )
LOG_TRACE ( log , " Don't have unfinished tasks after restarting " ) ;
else
LOG_INFO ( log , " Have {} unfinished tasks, will check them " , current_tasks . size ( ) ) ;
assert ( current_tasks . size ( ) < = pool_size + ( worker_pool ! = nullptr ) ) ;
auto task_it = current_tasks . begin ( ) ;
while ( task_it ! = current_tasks . end ( ) )
2020-12-04 20:12:32 +00:00
{
2021-03-04 23:17:07 +00:00
auto & task = * task_it ;
if ( task - > completely_processed )
{
2022-06-29 14:27:21 +00:00
chassert ( task - > was_executed ) ;
2021-03-04 23:17:07 +00:00
/// Status must be written (but finished/ node may not exist if entry was deleted).
/// If someone is deleting entry concurrently, then /active status dir must not exist.
assert ( zookeeper - > exists ( task - > getFinishedNodePath ( ) ) | | ! zookeeper - > exists ( fs : : path ( task - > entry_path ) / " active " ) ) ;
+ + task_it ;
}
else if ( task - > was_executed )
2021-02-23 05:23:24 +00:00
{
2021-03-04 23:17:07 +00:00
/// Connection was lost on attempt to write status. Will retry.
2021-02-23 05:26:46 +00:00
bool status_written = zookeeper - > exists ( task - > getFinishedNodePath ( ) ) ;
2021-03-04 23:17:07 +00:00
/// You might think that the following condition is redundant, because status_written implies completely_processed.
/// But it's wrong. It's possible that (!task->completely_processed && status_written)
/// if ZooKeeper successfully received and processed our request
/// but we lost connection while waiting for the response.
/// Yeah, distributed systems is a zoo.
if ( status_written )
2022-07-27 17:15:00 +00:00
{
/// TODO We cannot guarantee that query was actually executed synchronously if connection was lost.
2022-07-28 11:07:46 +00:00
/// Let's simple create synced/ node for now, but it would be better to pass UNFINISHED status to initiator
2022-07-27 17:15:00 +00:00
/// or wait for query to actually finish (requires https://github.com/ClickHouse/ClickHouse/issues/23513)
task - > createSyncedNodeIfNeed ( zookeeper ) ;
2021-03-04 23:17:07 +00:00
task - > completely_processed = true ;
2022-07-27 17:15:00 +00:00
}
2021-03-04 23:17:07 +00:00
else
2021-02-23 05:26:46 +00:00
processTask ( * task , zookeeper ) ;
2021-03-04 23:17:07 +00:00
+ + task_it ;
}
else
{
/// We didn't even executed a query, so let's just remove it.
/// We will try to read the task again and execute it from the beginning.
2021-03-05 12:50:10 +00:00
/// NOTE: We can safely compare entry names as Strings, because they are padded.
/// Entry name always starts with "query-" and contain exactly 10 decimal digits
/// of log entry number (with leading zeros).
2021-03-04 23:17:07 +00:00
if ( ! first_failed_task_name | | task - > entry_name < * first_failed_task_name )
first_failed_task_name = task - > entry_name ;
task_it = current_tasks . erase ( task_it ) ;
2021-02-23 05:23:24 +00:00
}
2020-12-04 20:12:32 +00:00
}
}
2021-07-09 14:05:35 +00:00
Strings queue_nodes = zookeeper - > getChildren ( queue_dir , & queue_node_stat , queue_updated_event ) ;
2021-04-15 18:34:53 +00:00
size_t size_before_filtering = queue_nodes . size ( ) ;
2017-08-02 14:42:35 +00:00
filterAndSortQueueNodes ( queue_nodes ) ;
2022-05-20 10:41:44 +00:00
/// The following message is too verbose, but it can be useful to debug mysterious test failures in CI
2021-04-15 18:34:53 +00:00
LOG_TRACE ( log , " scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
" entries={}..{}, "
2021-07-05 00:02:35 +00:00
" first_failed_task_name={}, current_tasks_size={}, "
" last_current_task={}, "
2021-04-15 18:34:53 +00:00
" last_skipped_entry_name={} " ,
initialized , size_before_filtering , queue_nodes . size ( ) ,
queue_nodes . empty ( ) ? " none " : queue_nodes . front ( ) , queue_nodes . empty ( ) ? " none " : queue_nodes . back ( ) ,
first_failed_task_name ? * first_failed_task_name : " none " , current_tasks . size ( ) ,
current_tasks . empty ( ) ? " none " : current_tasks . back ( ) - > entry_name ,
last_skipped_entry_name ? * last_skipped_entry_name : " none " ) ;
2021-03-04 23:17:07 +00:00
if ( max_tasks_in_queue < queue_nodes . size ( ) )
2021-02-16 14:05:58 +00:00
cleanup_event - > set ( ) ;
2017-07-26 19:31:32 +00:00
2021-02-24 18:22:36 +00:00
/// Detect queue start, using:
/// - skipped tasks
2021-03-04 23:17:07 +00:00
/// - in memory tasks (that are currently active or were finished recently)
/// - failed tasks (that should be processed again)
2020-12-04 20:12:32 +00:00
auto begin_node = queue_nodes . begin ( ) ;
2021-03-04 23:17:07 +00:00
if ( first_failed_task_name )
2020-12-04 20:12:32 +00:00
{
2021-03-04 23:17:07 +00:00
/// If we had failed tasks, then we should start from the first failed task.
2022-06-29 14:27:21 +00:00
chassert ( reinitialized ) ;
2021-03-04 23:17:07 +00:00
begin_node = std : : lower_bound ( queue_nodes . begin ( ) , queue_nodes . end ( ) , first_failed_task_name ) ;
2021-02-24 18:22:36 +00:00
}
2021-03-04 23:17:07 +00:00
else
2021-02-24 18:22:36 +00:00
{
2021-03-04 23:17:07 +00:00
/// We had no failed tasks. Let's just choose the maximum entry we have previously seen.
String last_task_name ;
if ( ! current_tasks . empty ( ) )
last_task_name = current_tasks . back ( ) - > entry_name ;
if ( last_skipped_entry_name & & last_task_name < * last_skipped_entry_name )
last_task_name = * last_skipped_entry_name ;
begin_node = std : : upper_bound ( queue_nodes . begin ( ) , queue_nodes . end ( ) , last_task_name ) ;
2020-12-04 20:12:32 +00:00
}
2021-01-26 17:51:25 +00:00
2021-03-04 23:17:07 +00:00
if ( begin_node = = queue_nodes . end ( ) )
LOG_DEBUG ( log , " No tasks to schedule " ) ;
else
LOG_DEBUG ( log , " Will schedule {} tasks starting from {} " , std : : distance ( begin_node , queue_nodes . end ( ) ) , * begin_node ) ;
/// Let's ensure that it's exactly the first task we should process.
/// Maybe such asserts are too paranoid and excessive,
/// but it's easy enough to break DDLWorker in a very unobvious way by making some minor change in code.
[[maybe_unused]] bool have_no_tasks_info = ! first_failed_task_name & & current_tasks . empty ( ) & & ! last_skipped_entry_name ;
assert ( have_no_tasks_info | | queue_nodes . end ( ) = = std : : find_if ( queue_nodes . begin ( ) , queue_nodes . end ( ) , [ & ] ( const String & entry_name )
{
/// We should return true if some invariants are violated.
String reason ;
auto task = initAndCheckTask ( entry_name , reason , zookeeper ) ;
bool maybe_currently_processing = current_tasks . end ( ) ! = std : : find_if ( current_tasks . begin ( ) , current_tasks . end ( ) , [ & ] ( const auto & t )
{
return t - > entry_name = = entry_name ;
} ) ;
/// begin_node is something like a log pointer
if ( begin_node = = queue_nodes . end ( ) | | entry_name < * begin_node )
{
/// Return true if entry should be scheduled.
/// There is a minor race condition: initAndCheckTask(...) may return not null
/// if someone is deleting outdated entry right now (including finished/ nodes), so we also check active/ status dir.
bool maybe_concurrently_deleting = task & & ! zookeeper - > exists ( fs : : path ( task - > entry_path ) / " active " ) ;
return task & & ! maybe_concurrently_deleting & & ! maybe_currently_processing ;
}
2022-03-22 13:43:52 +00:00
else if ( last_skipped_entry_name . has_value ( ) & & ! queue_fully_loaded_after_initialization_debug_helper )
{
/// If connection was lost during queue loading
/// we may start processing from finished task (because we don't know yet that it's finished) and it's ok.
return false ;
}
2021-03-04 23:17:07 +00:00
else
{
/// Return true if entry should not be scheduled.
bool processed = ! task & & reason = = TASK_PROCESSED_OUT_REASON ;
return processed | | maybe_currently_processing ;
}
} ) ) ;
2020-11-19 10:34:45 +00:00
for ( auto it = begin_node ; it ! = queue_nodes . end ( ) & & ! stop_flag ; + + it )
2017-07-28 16:14:49 +00:00
{
String entry_name = * it ;
2020-11-24 10:24:39 +00:00
LOG_TRACE ( log , " Checking task {} " , entry_name ) ;
2017-07-28 16:14:49 +00:00
2020-09-02 22:35:47 +00:00
String reason ;
auto task = initAndCheckTask ( entry_name , reason , zookeeper ) ;
2022-03-22 13:43:52 +00:00
if ( task )
{
queue_fully_loaded_after_initialization_debug_helper = true ;
}
else
2017-07-26 19:31:32 +00:00
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Will not execute task {}: {} " , entry_name , reason ) ;
2021-02-03 20:02:37 +00:00
updateMaxDDLEntryID ( entry_name ) ;
2021-02-04 19:41:44 +00:00
last_skipped_entry_name . emplace ( entry_name ) ;
2020-09-02 22:35:47 +00:00
continue ;
2017-07-26 19:31:32 +00:00
}
2020-12-03 18:14:27 +00:00
auto & saved_task = saveTask ( std : : move ( task ) ) ;
2017-04-17 17:04:31 +00:00
2020-12-03 18:14:27 +00:00
if ( worker_pool )
2017-04-17 17:04:31 +00:00
{
2021-02-22 15:39:10 +00:00
worker_pool - > scheduleOrThrowOnError ( [ this , & saved_task , zookeeper ] ( )
2020-11-19 10:34:45 +00:00
{
2020-12-03 18:14:27 +00:00
setThreadName ( " DDLWorkerExec " ) ;
2021-02-19 23:41:58 +00:00
processTask ( saved_task , zookeeper ) ;
2020-12-03 18:14:27 +00:00
} ) ;
2017-04-17 17:04:31 +00:00
}
2017-04-27 15:19:11 +00:00
else
{
2021-02-19 23:41:58 +00:00
processTask ( saved_task , zookeeper ) ;
2017-07-28 16:14:49 +00:00
}
2020-12-03 18:14:27 +00:00
}
}
2017-07-28 16:14:49 +00:00
2020-12-03 18:14:27 +00:00
DDLTaskBase & DDLWorker : : saveTask ( DDLTaskPtr & & task )
{
2021-01-28 13:48:17 +00:00
current_tasks . remove_if ( [ ] ( const DDLTaskPtr & t ) { return t - > completely_processed . load ( ) ; } ) ;
2021-03-04 23:17:07 +00:00
/// Tasks are scheduled and executed in main thread <==> Parallel execution is disabled
assert ( ( worker_pool ! = nullptr ) = = ( 1 < pool_size ) ) ;
/// Parallel execution is disabled ==> All previous tasks are failed to start or finished,
/// so current tasks list must be empty when we are ready to process new one.
assert ( worker_pool | | current_tasks . empty ( ) ) ;
/// Parallel execution is enabled ==> Not more than pool_size tasks are currently executing.
/// Note: If current_tasks.size() == pool_size, then all worker threads are busy,
/// so we will wait on worker_pool->scheduleOrThrowOnError(...)
assert ( ! worker_pool | | current_tasks . size ( ) < = pool_size ) ;
2020-12-03 18:14:27 +00:00
current_tasks . emplace_back ( std : : move ( task ) ) ;
2021-03-04 23:17:07 +00:00
if ( first_failed_task_name & & * first_failed_task_name = = current_tasks . back ( ) - > entry_name )
first_failed_task_name . reset ( ) ;
2020-12-03 18:14:27 +00:00
return * current_tasks . back ( ) ;
2017-07-28 16:14:49 +00:00
}
2021-02-19 23:41:58 +00:00
bool DDLWorker : : tryExecuteQuery ( const String & query , DDLTaskBase & task , const ZooKeeperPtr & zookeeper )
2017-04-25 15:21:03 +00:00
{
2017-07-28 16:14:49 +00:00
/// Add special comment at the start of query to easily identify DDL-produced queries in query_log
2017-08-11 20:20:15 +00:00
String query_prefix = " /* ddl_entry= " + task . entry_name + " */ " ;
2017-07-27 18:44:55 +00:00
String query_to_execute = query_prefix + query ;
ReadBufferFromString istr ( query_to_execute ) ;
String dummy_string ;
WriteBufferFromString ostr ( dummy_string ) ;
2021-01-28 13:48:17 +00:00
std : : optional < CurrentThread : : QueryScope > query_scope ;
2017-07-27 18:44:55 +00:00
2017-04-25 15:21:03 +00:00
try
{
2021-02-19 23:41:58 +00:00
auto query_context = task . makeQueryContext ( context , zookeeper ) ;
2021-02-01 19:29:47 +00:00
if ( ! task . is_initial_query )
2021-04-10 23:33:54 +00:00
query_scope . emplace ( query_context ) ;
executeQuery ( istr , ostr , ! task . is_initial_query , query_context , { } ) ;
2021-01-28 19:02:39 +00:00
2021-02-19 23:41:58 +00:00
if ( auto txn = query_context - > getZooKeeperMetadataTransaction ( ) )
2021-01-28 19:02:39 +00:00
{
2021-02-19 23:41:58 +00:00
/// Most queries commit changes to ZooKeeper right before applying local changes,
/// but some queries does not support it, so we have to do it here.
if ( ! txn - > isExecuted ( ) )
2021-01-28 19:02:39 +00:00
txn - > commit ( ) ;
}
2017-04-25 15:21:03 +00:00
}
2020-12-04 20:12:32 +00:00
catch ( const DB : : Exception & e )
{
2021-02-01 19:29:47 +00:00
if ( task . is_initial_query )
throw ;
2020-12-04 20:12:32 +00:00
task . execution_status = ExecutionStatus : : fromCurrentException ( ) ;
tryLogCurrentException ( log , " Query " + query + " wasn't finished successfully " ) ;
/// We use return value of tryExecuteQuery(...) in tryExecuteQueryOnLeaderReplica(...) to determine
/// if replica has stopped being leader and we should retry query.
/// However, for the majority of exceptions there is no sense to retry, because most likely we will just
/// get the same exception again. So we return false only for several special exception codes,
/// and consider query as executed with status "failed" and return true in other cases.
bool no_sense_to_retry = e . code ( ) ! = ErrorCodes : : KEEPER_EXCEPTION & &
e . code ( ) ! = ErrorCodes : : NOT_A_LEADER & &
2022-04-28 19:39:45 +00:00
e . code ( ) ! = ErrorCodes : : TABLE_IS_READ_ONLY & &
2020-12-04 20:12:32 +00:00
e . code ( ) ! = ErrorCodes : : CANNOT_ASSIGN_ALTER & &
e . code ( ) ! = ErrorCodes : : CANNOT_ALLOCATE_MEMORY & &
e . code ( ) ! = ErrorCodes : : MEMORY_LIMIT_EXCEEDED ;
return no_sense_to_retry ;
}
2017-04-25 15:21:03 +00:00
catch ( . . . )
{
2021-02-01 19:29:47 +00:00
if ( task . is_initial_query )
throw ;
2020-12-04 20:12:32 +00:00
task . execution_status = ExecutionStatus : : fromCurrentException ( ) ;
2017-07-27 18:44:55 +00:00
tryLogCurrentException ( log , " Query " + query + " wasn't finished successfully " ) ;
2017-04-25 15:21:03 +00:00
2020-12-04 20:12:32 +00:00
/// We don't know what exactly happened, but maybe it's Poco::NetException or std::bad_alloc,
/// so we consider unknown exception as retryable error.
2017-04-25 15:21:03 +00:00
return false ;
}
2020-12-04 20:12:32 +00:00
task . execution_status = ExecutionStatus ( 0 ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Executed query: {} " , query ) ;
2017-04-25 15:21:03 +00:00
return true ;
}
2021-02-03 20:02:37 +00:00
void DDLWorker : : updateMaxDDLEntryID ( const String & entry_name )
2021-02-01 12:32:57 +00:00
{
2021-02-09 15:14:20 +00:00
UInt64 id = DDLTaskBase : : getLogEntryNumber ( entry_name ) ;
2021-02-01 12:32:57 +00:00
auto prev_id = max_id . load ( std : : memory_order_relaxed ) ;
while ( prev_id < id )
{
if ( max_id . compare_exchange_weak ( prev_id , id ) )
{
2021-02-15 10:26:34 +00:00
if ( max_entry_metric )
CurrentMetrics : : set ( * max_entry_metric , id ) ;
2021-02-01 12:32:57 +00:00
break ;
}
}
}
2021-02-19 23:41:58 +00:00
void DDLWorker : : processTask ( DDLTaskBase & task , const ZooKeeperPtr & zookeeper )
2017-04-25 15:21:03 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Processing task {} ({}) " , task . entry_name , task . entry . query ) ;
2022-06-29 14:27:21 +00:00
chassert ( ! task . completely_processed ) ;
2017-04-25 15:21:03 +00:00
2020-11-27 14:04:03 +00:00
String active_node_path = task . getActiveNodePath ( ) ;
String finished_node_path = task . getFinishedNodePath ( ) ;
2017-08-02 14:42:35 +00:00
2021-03-04 23:17:07 +00:00
/// Step 1: Create ephemeral node in active/ status dir.
/// It allows other hosts to understand that task is currently executing (useful for system.distributed_ddl_queue)
/// and protects from concurrent deletion or the task.
2021-02-12 16:22:01 +00:00
/// It will tryRemove(...) on exception
auto active_node = zkutil : : EphemeralNodeHolder : : existing ( active_node_path , * zookeeper ) ;
/// Try fast path
2021-02-10 20:30:40 +00:00
auto create_active_res = zookeeper - > tryCreate ( active_node_path , { } , zkutil : : CreateMode : : Ephemeral ) ;
if ( create_active_res ! = Coordination : : Error : : ZOK )
{
2021-02-12 16:22:01 +00:00
if ( create_active_res ! = Coordination : : Error : : ZNONODE & & create_active_res ! = Coordination : : Error : : ZNODEEXISTS )
{
2022-06-29 14:27:21 +00:00
chassert ( Coordination : : isHardwareError ( create_active_res ) ) ;
2021-02-10 20:30:40 +00:00
throw Coordination : : Exception ( create_active_res , active_node_path ) ;
2021-02-12 16:22:01 +00:00
}
/// Status dirs were not created in enqueueQuery(...) or someone is removing entry
if ( create_active_res = = Coordination : : Error : : ZNONODE )
2021-03-04 23:17:07 +00:00
{
2022-06-29 14:27:21 +00:00
chassert ( dynamic_cast < DatabaseReplicatedTask * > ( & task ) = = nullptr ) ;
2021-03-04 23:17:07 +00:00
if ( task . was_executed )
{
/// Special case:
/// Task was executed (and we are trying to write status after connection loss) ==> Status dirs were previously created.
/// (Status dirs were previously created AND active/ does not exist) ==> Task was removed.
/// We cannot write status, but it's not required anymore, because no one will try to execute it again.
/// So we consider task as completely processed.
LOG_WARNING ( log , " Task {} is executed, but looks like entry {} was deleted, cannot write status " , task . entry_name , task . entry_path ) ;
task . completely_processed = true ;
return ;
}
2021-02-12 16:22:01 +00:00
createStatusDirs ( task . entry_path , zookeeper ) ;
2021-03-04 23:17:07 +00:00
}
2021-02-12 16:22:01 +00:00
if ( create_active_res = = Coordination : : Error : : ZNODEEXISTS )
{
2021-02-16 14:05:58 +00:00
/// Connection has been lost and now we are retrying,
2021-02-12 16:22:01 +00:00
/// but our previous ephemeral node still exists.
2022-04-14 13:31:05 +00:00
zookeeper - > waitForEphemeralToDisappearIfAny ( active_node_path ) ;
2021-02-12 16:22:01 +00:00
}
2021-02-10 20:30:40 +00:00
zookeeper - > create ( active_node_path , { } , zkutil : : CreateMode : : Ephemeral ) ;
}
2018-11-06 14:42:30 +00:00
2021-11-22 16:46:34 +00:00
/// We must hold the lock until task execution status is committed to ZooKeeper,
/// otherwise another replica may try to execute query again.
2022-01-17 11:52:51 +00:00
std : : unique_ptr < zkutil : : ZooKeeperLock > execute_on_leader_lock ;
2021-11-22 16:46:34 +00:00
2021-03-04 23:17:07 +00:00
/// Step 2: Execute query from the task.
2020-12-04 20:12:32 +00:00
if ( ! task . was_executed )
2017-08-02 14:42:35 +00:00
{
2020-12-04 20:12:32 +00:00
/// If table and database engine supports it, they will execute task.ops by their own in a single transaction
/// with other zk operations (such as appending something to ReplicatedMergeTree log, or
/// updating metadata in Replicated database), so we make create request for finished_node_path with status "0",
/// which means that query executed successfully.
task . ops . emplace_back ( zkutil : : makeRemoveRequest ( active_node_path , - 1 ) ) ;
2021-02-08 09:14:17 +00:00
task . ops . emplace_back ( zkutil : : makeCreateRequest ( finished_node_path , ExecutionStatus ( 0 ) . serializeText ( ) , zkutil : : CreateMode : : Persistent ) ) ;
2017-04-25 15:21:03 +00:00
2017-04-27 15:19:11 +00:00
try
{
2020-11-27 14:04:03 +00:00
String rewritten_query = queryToString ( task . query ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Executing query: {} " , rewritten_query ) ;
2017-04-25 15:21:03 +00:00
2020-11-27 14:04:03 +00:00
StoragePtr storage ;
if ( auto * query_with_table = dynamic_cast < ASTQueryWithTableAndOutput * > ( task . query . get ( ) ) ; query_with_table )
2017-04-27 15:19:11 +00:00
{
2021-11-11 13:28:18 +00:00
if ( query_with_table - > table )
2020-01-09 16:01:44 +00:00
{
/// It's not CREATE DATABASE
2021-04-10 23:33:54 +00:00
auto table_id = context - > tryResolveStorageID ( * query_with_table , Context : : ResolveOrdinary ) ;
2021-02-04 19:41:44 +00:00
storage = DatabaseCatalog : : instance ( ) . tryGetTable ( table_id , context ) ;
2020-01-09 16:01:44 +00:00
}
2019-06-26 14:52:20 +00:00
2020-11-27 14:04:03 +00:00
task . execute_on_leader = storage & & taskShouldBeExecutedOnLeader ( task . query , storage ) & & ! task . is_circular_replicated ;
2017-04-27 15:19:11 +00:00
}
2020-11-27 14:04:03 +00:00
if ( task . execute_on_leader )
2021-01-25 18:59:23 +00:00
{
2021-11-22 16:46:34 +00:00
tryExecuteQueryOnLeaderReplica ( task , storage , rewritten_query , task . entry_path , zookeeper , execute_on_leader_lock ) ;
2021-01-25 18:59:23 +00:00
}
2017-04-27 15:19:11 +00:00
else
2021-01-25 18:59:23 +00:00
{
storage . reset ( ) ;
2021-02-19 23:41:58 +00:00
tryExecuteQuery ( rewritten_query , task , zookeeper ) ;
2021-01-25 18:59:23 +00:00
}
2017-04-27 15:19:11 +00:00
}
2018-08-25 01:58:14 +00:00
catch ( const Coordination : : Exception & )
2017-04-27 15:19:11 +00:00
{
throw ;
}
catch ( . . . )
{
2021-02-01 19:29:47 +00:00
if ( task . is_initial_query )
throw ;
2019-12-19 19:39:49 +00:00
tryLogCurrentException ( log , " An error occurred before execution of DDL task: " ) ;
task . execution_status = ExecutionStatus : : fromCurrentException ( " An error occurred before execution " ) ;
2017-04-27 15:19:11 +00:00
}
2020-12-04 20:12:32 +00:00
if ( task . execution_status . code ! = 0 )
{
bool status_written_by_table_or_db = task . ops . empty ( ) ;
if ( status_written_by_table_or_db )
{
throw Exception ( ErrorCodes : : UNFINISHED , " Unexpected error: {} " , task . execution_status . serializeText ( ) ) ;
}
else
{
2021-02-19 23:41:58 +00:00
/// task.ops where not executed by table or database engine, so DDLWorker is responsible for
2020-12-04 20:12:32 +00:00
/// writing query execution status into ZooKeeper.
task . ops . emplace_back ( zkutil : : makeSetRequest ( finished_node_path , task . execution_status . serializeText ( ) , - 1 ) ) ;
}
}
2020-08-08 01:01:47 +00:00
/// We need to distinguish ZK errors occurred before and after query executing
2017-07-28 16:14:49 +00:00
task . was_executed = true ;
2017-04-25 15:21:03 +00:00
}
2021-03-04 23:17:07 +00:00
/// Step 3: Create node in finished/ status dir and write execution status.
2017-07-28 16:14:49 +00:00
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
2021-02-04 19:41:44 +00:00
/// NOTE: If ZooKeeper connection is lost here, we will try again to write query status.
/// NOTE: If both table and database are replicated, task is executed in single ZK transaction.
2020-12-03 18:14:27 +00:00
2020-12-04 20:12:32 +00:00
bool status_written = task . ops . empty ( ) ;
if ( ! status_written )
{
zookeeper - > multi ( task . ops ) ;
task . ops . clear ( ) ;
}
2021-01-26 17:51:25 +00:00
2021-02-12 16:22:01 +00:00
/// Active node was removed in multi ops
2021-02-19 23:41:58 +00:00
active_node - > setAlreadyRemoved ( ) ;
2021-02-12 16:22:01 +00:00
2022-07-27 17:15:00 +00:00
task . createSyncedNodeIfNeed ( zookeeper ) ;
2021-01-26 17:51:25 +00:00
task . completely_processed = true ;
2022-04-05 15:36:53 +00:00
updateMaxDDLEntryID ( task . entry_name ) ;
2017-04-27 15:19:11 +00:00
}
2017-04-25 15:21:03 +00:00
2021-02-15 10:26:34 +00:00
bool DDLWorker : : taskShouldBeExecutedOnLeader ( const ASTPtr & ast_ddl , const StoragePtr storage )
2017-04-27 15:19:11 +00:00
{
2019-06-26 14:52:20 +00:00
/// Pure DROP queries have to be executed on each node separately
2020-04-22 06:01:33 +00:00
if ( auto * query = ast_ddl - > as < ASTDropQuery > ( ) ; query & & query - > kind ! = ASTDropQuery : : Kind : : Truncate )
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
2022-07-25 16:00:54 +00:00
if ( ! ast_ddl - > as < ASTAlterQuery > ( ) & &
! ast_ddl - > as < ASTOptimizeQuery > ( ) & &
! ast_ddl - > as < ASTDropQuery > ( ) & &
! ast_ddl - > as < ASTCreateIndexQuery > ( ) & &
! ast_ddl - > as < ASTDropIndexQuery > ( ) )
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
2021-01-06 12:05:19 +00:00
if ( auto * alter = ast_ddl - > as < ASTAlterQuery > ( ) )
{
// Setting alters should be executed on all replicas
if ( alter - > isSettingsAlter ( ) )
return false ;
2021-01-26 17:51:25 +00:00
if ( alter - > isFreezeAlter ( ) )
return false ;
2021-01-06 12:05:19 +00:00
}
2019-06-26 14:52:20 +00:00
return storage - > supportsReplication ( ) ;
}
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
bool DDLWorker : : tryExecuteQueryOnLeaderReplica (
2020-11-27 14:04:03 +00:00
DDLTaskBase & task ,
2019-06-26 14:52:20 +00:00
StoragePtr storage ,
const String & rewritten_query ,
2020-11-27 14:04:03 +00:00
const String & /*node_path*/ ,
2021-11-22 16:46:34 +00:00
const ZooKeeperPtr & zookeeper ,
2022-01-17 11:52:51 +00:00
std : : unique_ptr < zkutil : : ZooKeeperLock > & execute_on_leader_lock )
2019-06-26 14:52:20 +00:00
{
StorageReplicatedMergeTree * replicated_storage = dynamic_cast < StorageReplicatedMergeTree * > ( storage . get ( ) ) ;
/// If we will develop new replicated storage
if ( ! replicated_storage )
2020-12-04 20:12:32 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Storage type '{}' is not supported by distributed DDL " , storage - > getName ( ) ) ;
2017-04-25 15:21:03 +00:00
2020-11-27 14:04:03 +00:00
String shard_path = task . getShardNodePath ( ) ;
2020-12-03 17:53:55 +00:00
String is_executed_path = fs : : path ( shard_path ) / " executed " ;
String tries_to_execute_path = fs : : path ( shard_path ) / " tries_to_execute " ;
2021-09-15 18:06:20 +00:00
assert ( shard_path . starts_with ( String ( fs : : path ( task . entry_path ) / " shards " / " " ) ) ) ;
zookeeper - > createIfNotExists ( fs : : path ( task . entry_path ) / " shards " , " " ) ;
zookeeper - > createIfNotExists ( shard_path , " " ) ;
2017-04-25 15:21:03 +00:00
2020-12-04 20:12:32 +00:00
/// Leader replica creates is_executed_path node on successful query execution.
/// We will remove create_shard_flag from zk operations list, if current replica is just waiting for leader to execute the query.
auto create_shard_flag = zkutil : : makeCreateRequest ( is_executed_path , task . host_id_str , zkutil : : CreateMode : : Persistent ) ;
2020-08-07 09:18:34 +00:00
/// Node exists, or we will create or we will get an exception
zookeeper - > tryCreate ( tries_to_execute_path , " 0 " , zkutil : : CreateMode : : Persistent ) ;
2018-12-21 17:28:21 +00:00
2020-08-07 09:18:34 +00:00
static constexpr int MAX_TRIES_TO_EXECUTE = 3 ;
2020-11-16 12:34:12 +00:00
static constexpr int MAX_EXECUTION_TIMEOUT_SEC = 3600 ;
2020-08-07 09:18:34 +00:00
String executed_by ;
zkutil : : EventPtr event = std : : make_shared < Poco : : Event > ( ) ;
2020-11-16 12:34:12 +00:00
/// We must use exists request instead of get, because zookeeper will not setup event
/// for non existing node after get request
if ( zookeeper - > exists ( is_executed_path , nullptr , event ) )
2020-08-07 09:18:34 +00:00
{
2020-11-16 12:34:12 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , zookeeper - > get ( is_executed_path ) ) ;
2022-05-06 16:37:20 +00:00
if ( auto op = task . getOpToUpdateLogPointer ( ) )
task . ops . push_back ( op ) ;
2020-08-07 09:18:34 +00:00
return true ;
}
2018-12-21 17:28:21 +00:00
2019-06-26 14:52:20 +00:00
pcg64 rng ( randomSeed ( ) ) ;
2018-12-21 17:28:21 +00:00
2021-11-22 16:46:34 +00:00
execute_on_leader_lock = createSimpleZooKeeperLock ( zookeeper , shard_path , " lock " , task . host_id_str ) ;
2020-08-07 09:18:34 +00:00
2020-11-16 12:34:12 +00:00
Stopwatch stopwatch ;
2020-12-04 20:12:32 +00:00
bool executed_by_us = false ;
bool executed_by_other_leader = false ;
2020-11-16 12:34:12 +00:00
/// Defensive programming. One hour is more than enough to execute almost all DDL queries.
/// If it will be very long query like ALTER DELETE for a huge table it's still will be executed,
/// but DDL worker can continue processing other queries.
while ( stopwatch . elapsedSeconds ( ) < = MAX_EXECUTION_TIMEOUT_SEC )
2019-06-26 14:52:20 +00:00
{
StorageReplicatedMergeTree : : Status status ;
2021-01-28 12:12:00 +00:00
// Has to get with zk fields to get active replicas field
replicated_storage - > getStatus ( status , true ) ;
2017-04-25 15:21:03 +00:00
2021-01-28 12:12:00 +00:00
// Should return as soon as possible if the table is dropped.
bool replica_dropped = replicated_storage - > is_dropped ;
bool all_replicas_likely_detached = status . active_replicas = = 0 & & ! DatabaseCatalog : : instance ( ) . isTableExist ( replicated_storage - > getStorageID ( ) , context ) ;
2021-02-01 02:40:48 +00:00
if ( replica_dropped | | all_replicas_likely_detached )
2021-01-27 05:56:36 +00:00
{
2021-02-01 02:40:48 +00:00
LOG_WARNING ( log , " , task {} will not be executed. " , task . entry_name ) ;
task . execution_status = ExecutionStatus ( ErrorCodes : : UNFINISHED , " Cannot execute replicated DDL query, table is dropped or detached permanently " ) ;
2021-01-27 02:53:10 +00:00
return false ;
}
2017-04-25 15:21:03 +00:00
2021-02-01 19:29:47 +00:00
if ( task . is_initial_query & & ! status . is_leader )
throw Exception ( ErrorCodes : : NOT_A_LEADER , " Cannot execute initial query on non-leader replica " ) ;
2020-08-07 09:18:34 +00:00
/// Any replica which is leader tries to take lock
2021-11-22 16:46:34 +00:00
if ( status . is_leader & & execute_on_leader_lock - > tryLock ( ) )
2019-06-26 14:52:20 +00:00
{
2020-08-24 09:07:37 +00:00
/// In replicated merge tree we can have multiple leaders. So we can
2020-11-16 12:34:12 +00:00
/// be "leader" and took lock, but another "leader" replica may have
/// already executed this task.
2020-08-24 09:07:37 +00:00
if ( zookeeper - > tryGet ( is_executed_path , executed_by ) )
{
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , executed_by ) ;
2020-12-04 20:12:32 +00:00
executed_by_other_leader = true ;
2022-05-06 16:37:20 +00:00
if ( auto op = task . getOpToUpdateLogPointer ( ) )
task . ops . push_back ( op ) ;
2020-08-24 09:07:37 +00:00
break ;
}
2020-11-16 12:34:12 +00:00
/// Checking and incrementing counter exclusively.
2020-08-07 09:18:34 +00:00
size_t counter = parse < int > ( zookeeper - > get ( tries_to_execute_path ) ) ;
if ( counter > MAX_TRIES_TO_EXECUTE )
2019-06-26 14:52:20 +00:00
break ;
2020-08-24 09:07:37 +00:00
2020-08-07 09:18:34 +00:00
zookeeper - > set ( tries_to_execute_path , toString ( counter + 1 ) ) ;
2017-04-25 15:21:03 +00:00
2020-12-04 20:12:32 +00:00
task . ops . push_back ( create_shard_flag ) ;
2021-04-04 09:23:40 +00:00
SCOPE_EXIT_MEMORY ( { if ( ! executed_by_us & & ! task . ops . empty ( ) ) task . ops . pop_back ( ) ; } ) ;
2020-12-04 20:12:32 +00:00
2019-06-26 14:52:20 +00:00
/// If the leader will unexpectedly changed this method will return false
/// and on the next iteration new leader will take lock
2021-02-19 23:41:58 +00:00
if ( tryExecuteQuery ( rewritten_query , task , zookeeper ) )
2019-06-26 14:52:20 +00:00
{
2020-12-04 20:12:32 +00:00
executed_by_us = true ;
2019-06-26 14:52:20 +00:00
break ;
2017-04-25 15:21:03 +00:00
}
2018-04-19 19:25:54 +00:00
}
2019-06-26 14:52:20 +00:00
2020-11-16 12:34:12 +00:00
/// Waiting for someone who will execute query and change is_executed_path node
2020-08-07 09:18:34 +00:00
if ( event - > tryWait ( std : : uniform_int_distribution < int > ( 0 , 1000 ) ( rng ) ) )
{
2020-09-30 14:58:27 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , zookeeper - > get ( is_executed_path ) ) ;
2020-12-04 20:12:32 +00:00
executed_by_other_leader = true ;
2022-05-06 16:37:20 +00:00
if ( auto op = task . getOpToUpdateLogPointer ( ) )
task . ops . push_back ( op ) ;
2020-08-07 09:18:34 +00:00
break ;
}
2020-11-16 12:34:12 +00:00
else
2020-08-07 09:18:34 +00:00
{
2020-11-16 12:34:12 +00:00
String tries_count ;
zookeeper - > tryGet ( tries_to_execute_path , tries_count ) ;
if ( parse < int > ( tries_count ) > MAX_TRIES_TO_EXECUTE )
{
/// Nobody will try to execute query again
LOG_WARNING ( log , " Maximum retries count for task {} exceeded, cannot execute replicated DDL query " , task . entry_name ) ;
break ;
}
else
{
/// Will try to wait or execute
LOG_TRACE ( log , " Task {} still not executed, will try to wait for it or execute ourselves, tries count {} " , task . entry_name , tries_count ) ;
}
2020-08-07 09:18:34 +00:00
}
2017-04-25 15:21:03 +00:00
}
2019-06-26 14:52:20 +00:00
2022-06-29 14:27:21 +00:00
chassert ( ! ( executed_by_us & & executed_by_other_leader ) ) ;
2020-12-04 20:12:32 +00:00
2019-06-26 14:52:20 +00:00
/// Not executed by leader so was not executed at all
2020-12-04 20:12:32 +00:00
if ( ! executed_by_us & & ! executed_by_other_leader )
2017-04-25 15:21:03 +00:00
{
2020-11-16 12:34:12 +00:00
/// If we failed with timeout
if ( stopwatch . elapsedSeconds ( ) > = MAX_EXECUTION_TIMEOUT_SEC )
{
LOG_WARNING ( log , " Task {} was not executed by anyone, maximum timeout {} seconds exceeded " , task . entry_name , MAX_EXECUTION_TIMEOUT_SEC ) ;
2020-11-17 11:04:27 +00:00
task . execution_status = ExecutionStatus ( ErrorCodes : : TIMEOUT_EXCEEDED , " Cannot execute replicated DDL query, timeout exceeded " ) ;
2020-11-16 12:34:12 +00:00
}
else /// If we exceeded amount of tries
{
LOG_WARNING ( log , " Task {} was not executed by anyone, maximum number of retries exceeded " , task . entry_name ) ;
2021-03-16 20:17:07 +00:00
task . execution_status = ExecutionStatus ( ErrorCodes : : UNFINISHED , " Cannot execute replicated DDL query, maximum retries exceeded " ) ;
2020-11-16 12:34:12 +00:00
}
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
}
2020-08-07 09:18:34 +00:00
2020-12-04 20:12:32 +00:00
if ( executed_by_us )
LOG_DEBUG ( log , " Task {} executed by current replica " , task . entry_name ) ;
else // if (executed_by_other_leader)
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , zookeeper - > get ( is_executed_path ) ) ;
2019-06-26 14:52:20 +00:00
return true ;
2017-04-25 15:21:03 +00:00
}
2021-02-09 15:14:20 +00:00
void DDLWorker : : cleanupQueue ( Int64 , const ZooKeeperPtr & zookeeper )
2017-04-19 14:21:27 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Cleaning queue " ) ;
2017-04-27 15:19:11 +00:00
2017-08-02 14:42:35 +00:00
Strings queue_nodes = zookeeper - > getChildren ( queue_dir ) ;
filterAndSortQueueNodes ( queue_nodes ) ;
for ( auto it = queue_nodes . cbegin ( ) ; it < queue_nodes . cend ( ) ; + + it )
2017-04-19 14:21:27 +00:00
{
2019-02-22 18:36:36 +00:00
if ( stop_flag )
return ;
2017-08-02 14:42:35 +00:00
String node_name = * it ;
2020-12-03 17:53:55 +00:00
String node_path = fs : : path ( queue_dir ) / node_name ;
2017-08-01 14:41:00 +00:00
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat ;
2017-08-02 14:42:35 +00:00
String dummy ;
2017-08-01 14:41:00 +00:00
2017-04-19 14:21:27 +00:00
try
{
2017-08-12 17:39:14 +00:00
/// Already deleted
if ( ! zookeeper - > exists ( node_path , & stat ) )
continue ;
2021-02-09 15:14:20 +00:00
if ( ! canRemoveQueueEntry ( node_name , stat ) )
2017-08-12 17:39:14 +00:00
continue ;
2021-02-10 20:30:40 +00:00
/// At first we remove entry/active node to prevent staled hosts from executing entry concurrently
auto rm_active_res = zookeeper - > tryRemove ( fs : : path ( node_path ) / " active " ) ;
if ( rm_active_res ! = Coordination : : Error : : ZOK & & rm_active_res ! = Coordination : : Error : : ZNONODE )
2017-08-12 17:39:14 +00:00
{
2021-02-10 20:30:40 +00:00
if ( rm_active_res = = Coordination : : Error : : ZNOTEMPTY )
LOG_DEBUG ( log , " Task {} should be deleted, but there are active workers. Skipping it. " , node_name ) ;
else
LOG_WARNING ( log , " Unexpected status code {} on attempt to remove {}/active " , rm_active_res , node_name ) ;
2017-08-02 14:42:35 +00:00
continue ;
2017-08-12 17:39:14 +00:00
}
2017-08-02 14:42:35 +00:00
2021-02-10 20:30:40 +00:00
/// Now we can safely delete entry
2021-02-09 15:14:20 +00:00
LOG_INFO ( log , " Task {} is outdated, deleting it " , node_name ) ;
2017-08-12 17:39:14 +00:00
2021-02-11 15:14:38 +00:00
/// We recursively delete all nodes except node_path/finished to prevent staled hosts from
/// creating node_path/active node (see createStatusDirs(...))
2022-04-04 22:51:48 +00:00
zookeeper - > tryRemoveChildrenRecursive ( node_path , /* probably_flat */ false , " finished " ) ;
2017-04-19 14:21:27 +00:00
2021-02-11 15:14:38 +00:00
/// And then we remove node_path and node_path/finished in a single transaction
2021-02-10 20:30:40 +00:00
Coordination : : Requests ops ;
2021-02-11 15:14:38 +00:00
Coordination : : Responses res ;
ops . emplace_back ( zkutil : : makeCheckRequest ( node_path , - 1 ) ) ; /// See a comment below
2021-02-10 20:30:40 +00:00
ops . emplace_back ( zkutil : : makeRemoveRequest ( fs : : path ( node_path ) / " finished " , - 1 ) ) ;
ops . emplace_back ( zkutil : : makeRemoveRequest ( node_path , - 1 ) ) ;
2021-02-11 15:14:38 +00:00
auto rm_entry_res = zookeeper - > tryMulti ( ops , res ) ;
2021-02-16 14:05:58 +00:00
2021-02-11 15:14:38 +00:00
if ( rm_entry_res = = Coordination : : Error : : ZNONODE )
{
/// Most likely both node_path/finished and node_path were removed concurrently.
bool entry_removed_concurrently = res [ 0 ] - > error = = Coordination : : Error : : ZNONODE ;
if ( entry_removed_concurrently )
continue ;
/// Possible rare case: initiator node has lost connection after enqueueing entry and failed to create status dirs.
/// No one has started to process the entry, so node_path/active and node_path/finished nodes were never created, node_path has no children.
/// Entry became outdated, but we cannot remove remove it in a transaction with node_path/finished.
2022-06-29 14:27:21 +00:00
chassert ( res [ 0 ] - > error = = Coordination : : Error : : ZOK & & res [ 1 ] - > error = = Coordination : : Error : : ZNONODE ) ;
2021-02-11 15:14:38 +00:00
rm_entry_res = zookeeper - > tryRemove ( node_path ) ;
2022-06-29 14:27:21 +00:00
chassert ( rm_entry_res ! = Coordination : : Error : : ZNOTEMPTY ) ;
2021-02-11 15:14:38 +00:00
continue ;
}
zkutil : : KeeperMultiException : : check ( rm_entry_res , ops , res ) ;
2017-04-19 14:21:27 +00:00
}
catch ( . . . )
{
2020-08-08 01:01:47 +00:00
LOG_INFO ( log , " An error occurred while checking and cleaning task {} from queue: {} " , node_name , getCurrentExceptionMessage ( false ) ) ;
2017-04-19 14:21:27 +00:00
}
}
}
2021-02-09 15:14:20 +00:00
bool DDLWorker : : canRemoveQueueEntry ( const String & entry_name , const Coordination : : Stat & stat )
{
/// Delete node if its lifetime is expired (according to task_max_lifetime parameter)
constexpr UInt64 zookeeper_time_resolution = 1000 ;
Int64 zookeeper_time_seconds = stat . ctime / zookeeper_time_resolution ;
bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < Poco : : Timestamp ( ) . epochTime ( ) ;
/// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
UInt32 entry_number = DDLTaskBase : : getLogEntryNumber ( entry_name ) ;
2021-02-10 20:30:40 +00:00
bool node_is_outside_max_window = entry_number + max_tasks_in_queue < max_id . load ( std : : memory_order_relaxed ) ;
2021-02-09 15:14:20 +00:00
return node_lifetime_is_expired | | node_is_outside_max_window ;
}
2017-04-13 16:12:56 +00:00
2017-08-11 20:20:15 +00:00
/// Try to create nonexisting "status" dirs for a node
2019-02-22 16:14:48 +00:00
void DDLWorker : : createStatusDirs ( const std : : string & node_path , const ZooKeeperPtr & zookeeper )
2017-04-13 13:42:29 +00:00
{
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2021-02-10 20:30:40 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( fs : : path ( node_path ) / " active " , { } , zkutil : : CreateMode : : Persistent ) ) ;
ops . emplace_back ( zkutil : : makeCreateRequest ( fs : : path ( node_path ) / " finished " , { } , zkutil : : CreateMode : : Persistent ) ) ;
2018-08-25 01:58:14 +00:00
Coordination : : Responses responses ;
2020-06-12 15:09:12 +00:00
Coordination : : Error code = zookeeper - > tryMulti ( ops , responses ) ;
2021-02-11 15:14:38 +00:00
2021-02-10 20:30:40 +00:00
bool both_created = code = = Coordination : : Error : : ZOK ;
2021-02-11 15:14:38 +00:00
/// Failed on attempt to create node_path/active because it exists, so node_path/finished must exist too
2021-02-10 20:30:40 +00:00
bool both_already_exists = responses . size ( ) = = 2 & & responses [ 0 ] - > error = = Coordination : : Error : : ZNODEEXISTS
2021-02-11 15:14:38 +00:00
& & responses [ 1 ] - > error = = Coordination : : Error : : ZRUNTIMEINCONSISTENCY ;
assert ( ! both_already_exists | | ( zookeeper - > exists ( fs : : path ( node_path ) / " active " ) & & zookeeper - > exists ( fs : : path ( node_path ) / " finished " ) ) ) ;
/// Failed on attempt to create node_path/finished, but node_path/active does not exist
bool is_currently_deleting = responses . size ( ) = = 2 & & responses [ 0 ] - > error = = Coordination : : Error : : ZOK
& & responses [ 1 ] - > error = = Coordination : : Error : : ZNODEEXISTS ;
2021-02-10 20:30:40 +00:00
if ( both_created | | both_already_exists )
return ;
2021-02-11 15:14:38 +00:00
if ( is_currently_deleting )
2021-02-16 14:05:58 +00:00
{
cleanup_event - > set ( ) ;
2021-02-11 15:14:38 +00:00
throw Exception ( ErrorCodes : : UNFINISHED , " Cannot create status dirs for {}, "
" most likely because someone is deleting it concurrently " , node_path ) ;
2021-02-16 14:05:58 +00:00
}
2021-02-11 15:14:38 +00:00
/// Connection lost or entry was removed
assert ( Coordination : : isHardwareError ( code ) | | code = = Coordination : : Error : : ZNONODE ) ;
zkutil : : KeeperMultiException : : check ( code , ops , responses ) ;
2017-04-17 17:04:31 +00:00
}
2017-04-13 13:42:29 +00:00
2017-04-18 15:44:31 +00:00
String DDLWorker : : enqueueQuery ( DDLLogEntry & entry )
2017-04-13 16:12:56 +00:00
{
2020-11-27 14:04:03 +00:00
if ( entry . hosts . empty ( ) )
2017-08-02 14:42:35 +00:00
throw Exception ( " Empty host list in a distributed DDL task " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-13 16:12:56 +00:00
2019-02-22 16:14:48 +00:00
auto zookeeper = getAndSetZooKeeper ( ) ;
2020-12-03 17:53:55 +00:00
String query_path_prefix = fs : : path ( queue_dir ) / " query- " ;
2017-04-17 17:04:31 +00:00
zookeeper - > createAncestors ( query_path_prefix ) ;
2017-04-13 16:12:56 +00:00
2020-11-27 14:04:03 +00:00
String node_path = zookeeper - > create ( query_path_prefix , entry . toString ( ) , zkutil : : CreateMode : : PersistentSequential ) ;
2021-08-11 03:40:06 +00:00
if ( max_pushed_entry_metric )
{
String str_buf = node_path . substr ( query_path_prefix . length ( ) ) ;
DB : : ReadBufferFromString in ( str_buf ) ;
CurrentMetrics : : Metric id ;
readText ( id , in ) ;
id = std : : max ( * max_pushed_entry_metric , id ) ;
CurrentMetrics : : set ( * max_pushed_entry_metric , id ) ;
}
2017-08-02 14:42:35 +00:00
2020-12-03 18:14:27 +00:00
/// We cannot create status dirs in a single transaction with previous request,
/// because we don't know node_path until previous request is executed.
/// Se we try to create status dirs here or later when we will execute entry.
2017-08-02 14:42:35 +00:00
try
{
2019-02-22 16:14:48 +00:00
createStatusDirs ( node_path , zookeeper ) ;
2017-08-02 14:42:35 +00:00
}
catch ( . . . )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " An error occurred while creating auxiliary ZooKeeper directories in {} . They will be created later. Error : {} " , node_path , getCurrentExceptionMessage ( true ) ) ;
2017-08-02 14:42:35 +00:00
}
2017-04-18 15:44:31 +00:00
return node_path ;
2017-04-13 13:42:29 +00:00
}
2017-04-13 16:12:56 +00:00
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
bool DDLWorker : : initializeMainThread ( )
2017-04-13 13:42:29 +00:00
{
2022-06-29 14:27:21 +00:00
chassert ( ! initialized ) ;
2017-04-27 15:19:11 +00:00
setThreadName ( " DDLWorker " ) ;
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
LOG_DEBUG ( log , " Initializing DDLWorker thread " ) ;
2017-04-18 15:44:31 +00:00
2021-01-26 17:51:25 +00:00
while ( ! stop_flag )
2017-07-31 18:57:13 +00:00
{
2017-08-01 14:41:00 +00:00
try
{
2019-12-27 19:30:13 +00:00
auto zookeeper = getAndSetZooKeeper ( ) ;
2020-12-03 17:53:55 +00:00
zookeeper - > createAncestors ( fs : : path ( queue_dir ) / " " ) ;
2019-12-27 19:30:13 +00:00
initialized = true ;
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
return true ;
2019-12-27 19:30:13 +00:00
}
catch ( const Coordination : : Exception & e )
{
if ( ! Coordination : : isHardwareError ( e . code ) )
2020-12-21 02:05:23 +00:00
{
/// A logical error.
2021-02-10 20:30:40 +00:00
LOG_ERROR ( log , " ZooKeeper error: {}. Failed to start DDLWorker. " , getCurrentExceptionMessage ( true ) ) ;
2022-06-29 14:27:21 +00:00
chassert ( false ) ; /// Catch such failures in tests with debug build
2020-12-21 02:05:23 +00:00
}
2018-11-10 17:52:25 +00:00
2019-12-27 19:30:13 +00:00
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
2017-08-01 14:41:00 +00:00
}
catch ( . . . )
{
2020-12-30 12:25:00 +00:00
tryLogCurrentException ( log , " Cannot initialize DDL queue. " ) ;
2017-08-01 14:41:00 +00:00
}
2021-01-26 17:51:25 +00:00
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds ( 5 ) ;
2019-02-22 18:36:36 +00:00
}
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
return false ;
2020-12-03 18:14:27 +00:00
}
void DDLWorker : : runMainThread ( )
{
2021-01-26 17:51:25 +00:00
auto reset_state = [ & ] ( )
2021-01-18 14:09:39 +00:00
{
initialized = false ;
/// It will wait for all threads in pool to finish and will not rethrow exceptions (if any).
/// We create new thread pool to forget previous exceptions.
2021-01-26 17:51:25 +00:00
if ( 1 < pool_size )
2021-01-18 14:09:39 +00:00
worker_pool = std : : make_unique < ThreadPool > ( pool_size ) ;
/// Clear other in-memory state, like server just started.
current_tasks . clear ( ) ;
2021-02-09 15:14:20 +00:00
last_skipped_entry_name . reset ( ) ;
2021-01-18 14:09:39 +00:00
max_id = 0 ;
2021-02-12 16:22:01 +00:00
LOG_INFO ( log , " Cleaned DDLWorker state " ) ;
2021-01-18 14:09:39 +00:00
} ;
2020-12-03 18:14:27 +00:00
setThreadName ( " DDLWorker " ) ;
LOG_DEBUG ( log , " Starting DDLWorker thread " ) ;
2017-08-01 14:41:00 +00:00
2017-04-13 13:42:29 +00:00
while ( ! stop_flag )
{
try
{
2021-02-23 05:26:46 +00:00
bool reinitialized = ! initialized ;
2020-12-03 18:14:27 +00:00
/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
if ( ! initialized )
{
DDLWorker: avoid NULL dereference on termination and failed zookeeper initialization
Log snipped shows the problem:
2021.02.24 04:40:29.349181 [ 39 ] {} <Warning> DDLWorker: DDLWorker is configured to use multiple threads. It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.
2021.02.24 04:40:29.349516 [ 39 ] {} <Information> Application: Ready for connections.
2021.02.24 04:40:29.349602 [ 74 ] {} <Debug> DDLWorker: Started DDLWorker cleanup thread
2021.02.24 04:40:29.349639 [ 73 ] {} <Debug> DDLWorker: Starting DDLWorker thread
2021.02.24 04:40:29.349698 [ 73 ] {} <Debug> DDLWorker: Started DDLWorker thread
2021.02.24 04:40:29.352548 [ 73 ] {} <Error> virtual void DB::DDLWorker::initializeMainThread(): Code: 999, e.displayText() = Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
Poco::Exception. Code: 1000, e.code() = 111, e.displayText() = Connection refused (version 21.3.1.1), 192.168.112.3:2181
(Connection loss), Stack trace (when copying this message, always include the lines below):
0. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error, int) @ 0xfe93923 in /usr/bin/clickhouse
1. Coordination::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Coordination::Error) @ 0xfe93ba2 in /usr/bin/clickhouse
2. Coordination::ZooKeeper::connect(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, Poco::Timespan) @ 0xfed3a01 in /usr/bin/clickhouse
3. Coordination::ZooKeeper::ZooKeeper(std::__1::vector<Coordination::ZooKeeper::Node, std::__1::allocator<Coordination::ZooKeeper::Node> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, Poco::Timespan, Poco::Timespan, Poco::Timespan) @ 0xfed2222 in /usr/bin/clickhouse
4. zkutil::ZooKeeper::init(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, int, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe961cd in /usr/bin/clickhouse
5. zkutil::ZooKeeper::ZooKeeper(Poco::Util::AbstractConfiguration const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfe97a96 in /usr/bin/clickhouse
6. void std::__1::allocator_traits<std::__1::allocator<zkutil::ZooKeeper> >::__construct<zkutil::ZooKeeper, Poco::Util::AbstractConfiguration const&, char const (&) [10]>(std::__1::integral_constant<bool, true>, std::__1::allocator<zkutil::ZooKeeper>&, zkutil::ZooKeeper*, Poco::Util::AbstractConfiguration const&, char const (&) [10]) @ 0xed98387 in /usr/bin/clickhouse
7. DB::Context::getZooKeeper() const @ 0xed75190 in /usr/bin/clickhouse
8. DB::DDLWorker::getAndSetZooKeeper() @ 0xedb81c9 in /usr/bin/clickhouse
9. DB::DDLWorker::initializeMainThread() @ 0xedc9eb0 in /usr/bin/clickhouse
10. DB::DDLWorker::runMainThread() @ 0xedb5d01 in /usr/bin/clickhouse
11. ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::DDLWorker::*)(), DB::DDLWorker*>(void (DB::DDLWorker::*&&)(), DB::DDLWorker*&&)::'lambda'()::operator()() @ 0xedcafa1 in /usr/bin/clickhouse
12. ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) @ 0x892651f in /usr/bin/clickhouse
13. ? @ 0x8929fb3 in /usr/bin/clickhouse
14. start_thread @ 0x8ea7 in /lib/x86_64-linux-gnu/libpthread-2.31.so
15. __clone @ 0xfddef in /lib/x86_64-linux-gnu/libc-2.31.so
(version 21.3.1.1)
...
2021.02.24 04:40:30.025278 [ 41 ] {} <Trace> BaseDaemon: Received signal 15
2021.02.24 04:40:30.025336 [ 41 ] {} <Information> Application: Received termination signal (Terminated)
...
2021.02.24 04:40:30.582078 [ 39 ] {} <Information> Application: Closed all listening sockets.
2021.02.24 04:40:30.582124 [ 39 ] {} <Information> Application: Closed connections.
2021.02.24 04:40:30.583770 [ 39 ] {} <Information> Application: Shutting down storages.
2021.02.24 04:40:30.583932 [ 39 ] {} <Information> Context: Shutdown disk data
2021.02.24 04:40:30.583951 [ 39 ] {} <Information> Context: Shutdown disk default
2021.02.24 04:40:30.584163 [ 46 ] {} <Trace> SystemLog (system.query_log): Terminating
2021.02.24 04:40:30.586025 [ 39 ] {} <Trace> BackgroundSchedulePool/BgSchPool: Waiting for threads to finish.
2021.02.24 04:40:34.352701 [ 73 ] {} <Debug> DDLWorker: Initialized DDLWorker thread
2021.02.24 04:40:34.352758 [ 73 ] {} <Debug> DDLWorker: Scheduling tasks
2021-02-24 05:07:31 +00:00
/// Stopped
if ( ! initializeMainThread ( ) )
break ;
2020-12-03 18:14:27 +00:00
LOG_DEBUG ( log , " Initialized DDLWorker thread " ) ;
}
2018-09-28 15:30:03 +00:00
2019-02-22 18:36:36 +00:00
cleanup_event - > set ( ) ;
2021-02-23 05:26:46 +00:00
scheduleTasks ( reinitialized ) ;
2017-04-13 13:42:29 +00:00
2021-11-14 14:41:55 +00:00
LOG_DEBUG ( log , " Waiting for queue updates " ) ;
2021-09-01 09:18:28 +00:00
queue_updated_event - > wait ( ) ;
2017-04-19 14:21:27 +00:00
}
2018-09-24 18:44:09 +00:00
catch ( const Coordination : : Exception & e )
2017-04-27 15:19:11 +00:00
{
2018-08-25 01:58:14 +00:00
if ( Coordination : : isHardwareError ( e . code ) )
2017-07-28 16:14:49 +00:00
{
2020-12-03 18:14:27 +00:00
initialized = false ;
2021-02-23 05:22:08 +00:00
/// Wait for pending async tasks
if ( 1 < pool_size )
worker_pool = std : : make_unique < ThreadPool > ( pool_size ) ;
2021-01-26 17:51:25 +00:00
LOG_INFO ( log , " Lost ZooKeeper connection, will try to connect again: {} " , getCurrentExceptionMessage ( true ) ) ;
2017-07-28 16:14:49 +00:00
}
else
{
2021-01-26 17:51:25 +00:00
LOG_ERROR ( log , " Unexpected ZooKeeper error, will try to restart main thread: {} " , getCurrentExceptionMessage ( true ) ) ;
2020-12-30 12:25:00 +00:00
reset_state ( ) ;
2017-07-28 16:14:49 +00:00
}
2021-01-26 17:51:25 +00:00
sleepForSeconds ( 1 ) ;
2017-04-27 15:19:11 +00:00
}
2017-04-19 14:21:27 +00:00
catch ( . . . )
{
2021-12-12 02:24:47 +00:00
tryLogCurrentException ( log , " Unexpected error, will try to restart main thread " ) ;
2020-12-30 12:25:00 +00:00
reset_state ( ) ;
2021-01-25 18:59:23 +00:00
sleepForSeconds ( 5 ) ;
2017-04-19 14:21:27 +00:00
}
2017-04-13 13:42:29 +00:00
}
}
2017-04-13 16:12:56 +00:00
2019-02-22 18:36:36 +00:00
void DDLWorker : : runCleanupThread ( )
{
setThreadName ( " DDLWorkerClnr " ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Started DDLWorker cleanup thread " ) ;
2019-02-22 18:36:36 +00:00
Int64 last_cleanup_time_seconds = 0 ;
while ( ! stop_flag )
{
try
{
cleanup_event - > wait ( ) ;
if ( stop_flag )
break ;
Int64 current_time_seconds = Poco : : Timestamp ( ) . epochTime ( ) ;
if ( last_cleanup_time_seconds & & current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period )
{
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Too early to clean queue, will do it later. " ) ;
2019-02-22 18:36:36 +00:00
continue ;
}
2020-12-03 18:14:27 +00:00
/// ZooKeeper connection is recovered by main thread. We will wait for it on cleanup_event.
2019-02-22 18:36:36 +00:00
auto zookeeper = tryGetZooKeeper ( ) ;
if ( zookeeper - > expired ( ) )
continue ;
cleanupQueue ( current_time_seconds , zookeeper ) ;
last_cleanup_time_seconds = current_time_seconds ;
}
catch ( . . . )
{
tryLogCurrentException ( log , __PRETTY_FUNCTION__ ) ;
}
}
}
2017-04-13 13:42:29 +00:00
}