More effective retries, fast partition discovering. [#CLICKHOUSE-3346]

This commit is contained in:
Vitaliy Lyudvichenko 2018-03-05 03:47:25 +03:00
parent 17f39ab33f
commit 47d1d4c83c
2 changed files with 257 additions and 216 deletions

View File

@ -103,7 +103,6 @@ String getDatabaseDotTable(const DatabaseAndTableName & db_and_table)
} }
enum class TaskState enum class TaskState
{ {
Started = 0, Started = 0,
@ -111,7 +110,6 @@ enum class TaskState
Unknown Unknown
}; };
/// Used to mark status of shard partition tasks /// Used to mark status of shard partition tasks
struct TaskStateWithOwner struct TaskStateWithOwner
{ {
@ -157,12 +155,12 @@ struct TaskTable;
struct TaskCluster; struct TaskCluster;
struct ClusterPartition; struct ClusterPartition;
using TasksPartition = std::map<String, ShardPartition, std::greater<String>>; using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
using ShardInfo = Cluster::ShardInfo; using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>; using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>; using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>; using TasksTable = std::list<TaskTable>;
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<String>>; using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
/// Just destination partition of a shard /// Just destination partition of a shard
@ -209,7 +207,7 @@ struct TaskShard
String getDescription() const; String getDescription() const;
String getHostNameExample() const; String getHostNameExample() const;
/// Used to sort clusters by thier proximity /// Used to sort clusters by their proximity
ShardPriority priority; ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard /// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
@ -217,7 +215,10 @@ struct TaskShard
/// There is a task for each destination partition /// There is a task for each destination partition
TasksPartition partition_tasks; TasksPartition partition_tasks;
bool partition_tasks_initialized = false;
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std::set<String> checked_partitions;
/// Last CREATE TABLE query of the table of the shard /// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query; ASTPtr current_pull_table_create_query;
@ -228,14 +229,10 @@ struct TaskShard
}; };
/// Contains all cluster shards that contain a partition (and sorted by the proximity) /// Contains info about all shards that contain a partition
struct ClusterPartition struct ClusterPartition
{ {
/// Shards having that partition double elapsed_time_seconds = 0;
TasksShard shards;
bool shards_are_initialized = false;
Stopwatch watch;
UInt64 bytes_copied = 0; UInt64 bytes_copied = 0;
UInt64 rows_copied = 0; UInt64 rows_copied = 0;
@ -271,8 +268,7 @@ struct TaskTable
ASTPtr engine_push_ast; ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast; ASTPtr engine_push_partition_key_ast;
/// Local Distributed table used to split data /// A Distributed table definition used to split data
DatabaseAndTableName table_split;
String sharding_key_str; String sharding_key_str;
ASTPtr sharding_key_ast; ASTPtr sharding_key_ast;
ASTPtr engine_split_ast; ASTPtr engine_split_ast;
@ -552,8 +548,6 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
ParserExpressionWithOptionalAlias parser_expression(false); ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str); sharding_key_ast = parseQuery(parser_expression, sharding_key_str);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + table_id);
} }
where_condition_str = config.getString(table_prefix + "where_condition", ""); where_condition_str = config.getString(table_prefix + "where_condition", "");
@ -766,7 +760,7 @@ public:
} }
template <typename T> template <typename T>
decltype(auto) retry(T && func, size_t max_tries) decltype(auto) retry(T && func, size_t max_tries = 100)
{ {
std::exception_ptr exception; std::exception_ptr exception;
@ -795,7 +789,7 @@ public:
{ {
TaskTable & task_table = task_shard->task_table; TaskTable & task_table = task_shard->task_table;
LOG_INFO(log, "Set up shard " << task_shard->getDescription()); LOG_INFO(log, "Discover partitions of shard " << task_shard->getDescription());
auto get_partitions = [&] () { return getShardPartitions(*task_shard); }; auto get_partitions = [&] () { return getShardPartitions(*task_shard); };
auto existing_partitions_names = retry(get_partitions, 60); auto existing_partitions_names = retry(get_partitions, 60);
@ -854,8 +848,10 @@ public:
} }
for (const String & partition_name : filtered_partitions_names) for (const String & partition_name : filtered_partitions_names)
{
task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name)); task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name));
task_shard->partition_tasks_initialized = true; task_shard->checked_partitions.emplace(partition_name, true);
}
if (!missing_partitions.empty()) if (!missing_partitions.empty())
{ {
@ -918,137 +914,6 @@ public:
task_descprtion_current_version = version_to_update; task_descprtion_current_version = version_to_update;
} }
static constexpr size_t max_table_tries = 1000;
static constexpr size_t max_partition_tries = 1;
bool tryProcessTable(TaskTable & task_table)
{
/// Process each partition that is present in cluster
for (const String & partition_name : task_table.ordered_partition_names)
{
if (!task_table.cluster_partitions.count(partition_name))
throw Exception("There are no expected partition " + partition_name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
if (cluster_partition.total_tries == 0)
cluster_partition.watch.restart();
else
cluster_partition.watch.start();
SCOPE_EXIT(cluster_partition.watch.stop());
bool partition_is_done = false;
size_t num_partition_tries = 0;
/// Retry partition processing
while (!partition_is_done && num_partition_tries < max_partition_tries)
{
++num_partition_tries;
++cluster_partition.total_tries;
size_t num_successful_shards = 0;
LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster");
if (!cluster_partition.shards_are_initialized)
{
/// Discover partitions of each shard in a deferred manner
/// And copy shard data if current partition is found in it
for (const TaskShardPtr & shard : task_table.all_shards)
{
if (!shard->partition_tasks_initialized)
discoverShardPartitions(shard);
auto it_shard_partition = shard->partition_tasks.find(partition_name);
if (it_shard_partition == shard->partition_tasks.end())
{
LOG_DEBUG(log, "Found that shard " << shard->getDescription() << " does not contain current partition "
<< partition_name);
continue;
}
if (processPartitionTask(it_shard_partition->second))
++num_successful_shards;
/// Populate list of shard having current partition
cluster_partition.shards.emplace_back(shard);
}
cluster_partition.shards_are_initialized = true;
}
else
{
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
for (const TaskShardPtr & shard : cluster_partition.shards)
{
auto it_shard_partition = shard->partition_tasks.find(partition_name);
if (it_shard_partition == shard->partition_tasks.end())
throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (processPartitionTask(it_shard_partition->second))
++num_successful_shards;
}
}
try
{
const TasksShard & shards_with_partition = cluster_partition.shards;
partition_is_done = (num_successful_shards == shards_with_partition.size())
&& checkPartitionIsDone(task_table, partition_name, shards_with_partition);
}
catch (...)
{
tryLogCurrentException(log);
partition_is_done = false;
}
if (!partition_is_done)
std::this_thread::sleep_for(default_sleep_time);
}
if (partition_is_done)
{
task_table.finished_cluster_partitions.emplace(partition_name);
task_table.bytes_copied += cluster_partition.bytes_copied;
task_table.rows_copied += cluster_partition.rows_copied;
double elapsed = cluster_partition.watch.elapsedSeconds();
LOG_INFO(log, "It took " << std::fixed << std::setprecision(2) << elapsed << " seconds to copy partition " << partition_name
<< ": " << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied)
<< " uncompressed bytes and "
<< formatReadableQuantity(cluster_partition.rows_copied) << " rows are copied");
if (cluster_partition.rows_copied)
{
LOG_INFO(log, "Average partition speed: "
<< formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed) << " per second.");
}
if (task_table.rows_copied)
{
LOG_INFO(log, "Average table " << task_table.table_id << " speed: "
<< formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed)
<< " per second.");
}
}
}
size_t required_partitions = task_table.cluster_partitions.size();
size_t finished_partitions = task_table.finished_cluster_partitions.size();
bool table_is_done = task_table.finished_cluster_partitions.size() >= task_table.cluster_partitions.size();
if (!table_is_done)
{
LOG_INFO(log, "Table " + task_table.table_id + " is not processed yet."
<< "Copied " << finished_partitions << " of " << required_partitions << ", will retry");
}
return table_is_done;
}
void process() void process()
{ {
for (TaskTable & task_table : task_cluster->table_tasks) for (TaskTable & task_table : task_cluster->table_tasks)
@ -1068,20 +933,20 @@ public:
/// After partitions of each shard are initialized, initialize cluster partitions /// After partitions of each shard are initialized, initialize cluster partitions
for (const TaskShardPtr & task_shard : task_table.all_shards) for (const TaskShardPtr & task_shard : task_table.all_shards)
{ {
for (const auto & partition : task_shard->partition_tasks) for (const auto & partition_elem : task_shard->partition_tasks)
{ {
const String & partition_name = partition.first; const String & partition_name = partition_elem.first;
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name]; task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
cluster_partition.shards.emplace_back(task_shard);
} }
} }
for (auto & partition_elem : task_table.cluster_partitions) for (auto & partition_elem : task_table.cluster_partitions)
{ {
const String & partition_name = partition_elem.first; const String & partition_name = partition_elem.first;
auto & partition = partition_elem.second;
partition.shards_are_initialized = true; for (const TaskShardPtr & task_shard : task_table.all_shards)
task_shard->checked_partitions.emplace(partition_name);
task_table.ordered_partition_names.emplace_back(partition_name); task_table.ordered_partition_names.emplace_back(partition_name);
} }
} }
@ -1128,6 +993,43 @@ public:
copy_fault_probability = copy_fault_probability_; copy_fault_probability = copy_fault_probability_;
} }
protected:
String getWorkersPath() const
{
return task_cluster->task_zookeeper_path + "/task_active_workers";
}
String getCurrentWorkerNodePath() const
{
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper,
const String & description)
{
while (true)
{
zkutil::Stat stat;
zookeeper->get(getWorkersPath(), &stat);
if (static_cast<size_t>(stat.numChildren) >= task_cluster->max_workers)
{
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << description);
std::this_thread::sleep_for(default_sleep_time);
updateConfigIfNeeded();
}
else
{
return std::make_shared<zkutil::EphemeralNodeHolder>(getCurrentWorkerNodePath(), *zookeeper, true, false, description);
}
}
}
/** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock. /** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
* State of some task could be changed during the processing. * State of some task could be changed during the processing.
* We have to ensure that all shards have the finished state and there are no dirty flag. * We have to ensure that all shards have the finished state and there are no dirty flag.
@ -1199,42 +1101,6 @@ public:
return true; return true;
} }
protected:
String getWorkersPath() const
{
return task_cluster->task_zookeeper_path + "/task_active_workers";
}
String getCurrentWorkerNodePath() const
{
return getWorkersPath() + "/" + host_id;
}
zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper,
const String & description)
{
while (true)
{
zkutil::Stat stat;
zookeeper->get(getWorkersPath(), &stat);
if (static_cast<size_t>(stat.numChildren) >= task_cluster->max_workers)
{
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << description);
std::this_thread::sleep_for(default_sleep_time);
updateConfigIfNeeded();
}
else
{
return std::make_shared<zkutil::EphemeralNodeHolder>(getCurrentWorkerNodePath(), *zookeeper, true, false, description);
}
}
}
/// Removes MATERIALIZED and ALIAS columns from create table query /// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast) static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
{ {
@ -1356,9 +1222,156 @@ protected:
} }
bool processPartitionTask(ShardPartition & task_partition) static constexpr size_t max_table_tries = 1000;
static constexpr size_t max_shard_partition_tries = 600;
bool tryProcessTable(TaskTable & task_table)
{ {
bool res; /// Process each partition that is present in cluster
for (const String & partition_name : task_table.ordered_partition_names)
{
if (!task_table.cluster_partitions.count(partition_name))
throw Exception("There are no expected partition " + partition_name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
Stopwatch watch;
TasksShard expected_shards;
size_t num_failed_shards = 0;
++cluster_partition.total_tries;
LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster");
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
for (const TaskShardPtr & shard : task_table.all_shards)
{
/// Does shard have a node with current partition?
if (shard->partition_tasks.count(partition_name) == 0)
{
/// If not, did we check existence of that partition previously?
if (shard->checked_partitions.count(partition_name) == 0)
{
auto check_shard_has_partition = [&] () { return checkShardHasPartition(*shard, partition_name); };
bool has_partition = retry(check_shard_has_partition);
shard->checked_partitions.emplace(partition_name);
if (has_partition)
{
shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name));
LOG_DEBUG(log, "Discovered partition " << partition_name << " in shard " << shard->getDescription());
}
else
{
LOG_DEBUG(log, "Found that shard " << shard->getDescription() << " does not contain current partition " << partition_name);
continue;
}
}
else
{
/// We have already checked that partition, but did not discover it
continue;
}
}
auto it_shard_partition = shard->partition_tasks.find(partition_name);
if (it_shard_partition == shard->partition_tasks.end())
throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
auto & partition = it_shard_partition->second;
expected_shards.emplace_back(shard);
PartitionTaskStatus task_status = PartitionTaskStatus::Error;
for (size_t try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(partition);
/// Exit if success
if (task_status == PartitionTaskStatus::Finished)
break;
/// Skip if the task is being processed by someone
if (task_status == PartitionTaskStatus::Active)
break;
/// Repeat on errors
std::this_thread::sleep_for(default_sleep_time);
}
if (task_status == PartitionTaskStatus::Error)
++num_failed_shards;
}
cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();
/// Check that whole cluster partition is done
/// Firstly check number failed partition tasks, than look into ZooKeeper and ensure that each partition is done
bool partition_is_done = num_failed_shards == 0;
try
{
partition_is_done = partition_is_done && checkPartitionIsDone(task_table, partition_name, expected_shards);
}
catch (...)
{
tryLogCurrentException(log);
partition_is_done = false;
}
if (partition_is_done)
{
task_table.finished_cluster_partitions.emplace(partition_name);
task_table.bytes_copied += cluster_partition.bytes_copied;
task_table.rows_copied += cluster_partition.rows_copied;
double elapsed = cluster_partition.elapsed_time_seconds;
LOG_INFO(log, "It took " << std::fixed << std::setprecision(2) << elapsed << " seconds to copy partition " << partition_name
<< ": " << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied)
<< " uncompressed bytes and "
<< formatReadableQuantity(cluster_partition.rows_copied) << " rows are copied");
if (cluster_partition.rows_copied)
{
LOG_INFO(log, "Average partition speed: "
<< formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed) << " per second.");
}
if (task_table.rows_copied)
{
LOG_INFO(log, "Average table " << task_table.table_id << " speed: "
<< formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed)
<< " per second.");
}
}
}
size_t required_partitions = task_table.cluster_partitions.size();
size_t finished_partitions = task_table.finished_cluster_partitions.size();
bool table_is_done = finished_partitions >= required_partitions;
if (!table_is_done)
{
LOG_INFO(log, "Table " + task_table.table_id + " is not processed yet."
<< "Copied " << finished_partitions << " of " << required_partitions << ", will retry");
}
return table_is_done;
}
/// Execution status of a task
enum class PartitionTaskStatus
{
Active,
Finished,
Error,
};
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition)
{
PartitionTaskStatus res;
try try
{ {
@ -1367,7 +1380,7 @@ protected:
catch (...) catch (...)
{ {
tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name); tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
res = false; res = PartitionTaskStatus::Error;
} }
/// At the end of each task check if the config is updated /// At the end of each task check if the config is updated
@ -1383,7 +1396,7 @@ protected:
return res; return res;
} }
bool processPartitionTaskImpl(ShardPartition & task_partition) PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition)
{ {
TaskShard & task_shard = task_partition.task_shard; TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table; TaskTable & task_table = task_shard.task_table;
@ -1441,7 +1454,7 @@ protected:
tryLogCurrentException(log, "An error occurred while clean partition"); tryLogCurrentException(log, "An error occurred while clean partition");
} }
return false; return PartitionTaskStatus::Error;
} }
/// Create ephemeral node to mark that we are active and process the partition /// Create ephemeral node to mark that we are active and process the partition
@ -1456,7 +1469,7 @@ protected:
if (e.code == ZNODEEXISTS) if (e.code == ZNODEEXISTS)
{ {
LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path); LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path);
return false; return PartitionTaskStatus::Active;
} }
throw; throw;
@ -1471,14 +1484,14 @@ protected:
if (status.state == TaskState::Finished) if (status.state == TaskState::Finished)
{ {
LOG_DEBUG(log, "Task " << current_task_status_path << " has been successfully executed by " << status.owner); LOG_DEBUG(log, "Task " << current_task_status_path << " has been successfully executed by " << status.owner);
return true; return PartitionTaskStatus::Finished;
} }
// Task is abandoned, initialize DROP PARTITION // Task is abandoned, initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner); LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner);
create_is_dirty_node(); create_is_dirty_node();
return false; return PartitionTaskStatus::Error;
} }
} }
@ -1514,7 +1527,7 @@ protected:
<< ". Partition will be dropped and refilled."); << ". Partition will be dropped and refilled.");
create_is_dirty_node(); create_is_dirty_node();
return false; return PartitionTaskStatus::Error;
} }
} }
} }
@ -1532,7 +1545,7 @@ protected:
if (info.getFailedOp().getPath() == is_dirty_flag_path) if (info.getFailedOp().getPath() == is_dirty_flag_path)
{ {
LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled"); LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
return false; return PartitionTaskStatus::Error;
} }
throw zkutil::KeeperException(info.code, current_task_status_path); throw zkutil::KeeperException(info.code, current_task_status_path);
@ -1660,7 +1673,7 @@ protected:
catch (...) catch (...)
{ {
tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty"); tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty");
return false; return PartitionTaskStatus::Error;
} }
} }
@ -1678,12 +1691,12 @@ protected:
else else
LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << zkutil::ZooKeeper::error2string(info.code)); LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << zkutil::ZooKeeper::error2string(info.code));
return false; return PartitionTaskStatus::Error;
} }
} }
LOG_INFO(log, "Partition " << task_partition.name << " copied"); LOG_INFO(log, "Partition " << task_partition.name << " copied");
return true; return PartitionTaskStatus::Finished;
} }
void dropAndCreateLocalTable(const ASTPtr & create_ast) void dropAndCreateLocalTable(const ASTPtr & create_ast)
@ -1706,14 +1719,6 @@ protected:
interpreter.execute(); interpreter.execute();
} }
bool existsRemoteTable(const DatabaseAndTableName & table, Connection & connection)
{
String query = "EXISTS " + getDatabaseDotTable(table);
Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
connection, query, InterpreterExistsQuery::getSampleBlock(), context));
return block.safeGetByPosition(0).column->getUInt(0) != 0;
}
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr) String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr)
{ {
String query = "SHOW CREATE TABLE " + getDatabaseDotTable(table); String query = "SHOW CREATE TABLE " + getDatabaseDotTable(table);
@ -1781,11 +1786,11 @@ protected:
query = wb.str(); query = wb.str();
} }
LOG_DEBUG(log, "Computing destination partition set, executing query: " << query);
ParserQuery parser_query(query.data() + query.size()); ParserQuery parser_query(query.data() + query.size());
ASTPtr query_ast = parseQuery(parser_query, query); ASTPtr query_ast = parseQuery(parser_query, query);
LOG_DEBUG(log, "Computing destination partition set, executing query: " << query);
Context local_context = context; Context local_context = context;
local_context.setSettings(task_cluster->settings_pull); local_context.setSettings(task_cluster->settings_pull);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in); Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in);
@ -1809,6 +1814,34 @@ protected:
return res; return res;
} }
bool checkShardHasPartition(TaskShard & task_shard, const String & partition_quoted_name)
{
createShardInternalTables(task_shard, false);
TaskTable & task_table = task_shard.task_table;
String query;
{
WriteBufferFromOwnString wb;
wb << "SELECT 1"
<< " FROM "<< getDatabaseDotTable(task_shard.table_read_shard)
<< " WHERE " << queryToString(task_table.engine_push_partition_key_ast) << " = " << partition_quoted_name
<< " LIMIT 1";
query = wb.str();
}
LOG_DEBUG(log, "Checking shard " << task_shard.getDescription() << " for partition "
<< partition_quoted_name << " existence, executing query: " << query);
ParserQuery parser_query(query.data() + query.size());
ASTPtr query_ast = parseQuery(parser_query, query);
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
InterpreterSelectQuery interp(query_ast, local_context);
return interp.execute().in->read().rows() != 0;
}
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster /** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully * Returns number of shards for which at least one replica executed query successfully
*/ */

View File

@ -318,7 +318,15 @@ void DistributedBlockOutputStream::writeSuffix()
pool->schedule([&job] () { job.stream->writeSuffix(); }); pool->schedule([&job] () { job.stream->writeSuffix(); });
} }
pool->wait(); try
{
pool->wait();
}
catch (Exception & exception)
{
exception.addMessage(getCurrentStateDescription());
throw;
}
LOG_DEBUG(&Logger::get("DistributedBlockOutputStream"), getCurrentStateDescription()); LOG_DEBUG(&Logger::get("DistributedBlockOutputStream"), getCurrentStateDescription());
} }