Add ability to execute mutations asynchronously

This commit is contained in:
alesapin 2019-12-16 18:51:15 +03:00
parent 9f27e05dc8
commit 2695aa13c4
7 changed files with 220 additions and 5 deletions

View File

@ -388,6 +388,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(SettingUInt64, mutation_synchronous_wait_timeout, 0, "Seconds to wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). After execute asynchronously. 0 - execute asynchronously from the begging.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -42,6 +42,7 @@ namespace ErrorCodes
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int UNKNOWN_SETTING;
extern const int TOO_BIG_AST;
extern const int UNFINISHED;
}
namespace ActionLocks
@ -425,17 +426,18 @@ public:
};
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
/// Choose any disk, because when we load mutations we search them at each disk
/// where storage can be placed. See loadMutations().
auto disk = storage_policy->getAnyDisk();
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
String file_name;
Int64 version;
{
std::lock_guard lock(currently_processing_in_background_mutex);
Int64 version = increment.get();
version = increment.get();
entry.commit(version);
file_name = entry.file_name;
auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry));
@ -444,6 +446,17 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
LOG_INFO(log, "Added mutation: " << file_name);
merging_mutating_task_handle->wake();
size_t timeout = query_context.getSettingsRef().mutation_synchronous_wait_timeout;
/// If timeout is set, than we can wait
if (timeout != 0)
{
LOG_INFO(log, "Waiting mutation: " << file_name << " for " << timeout << " seconds");
auto check = [version, this]() { return isMutationDone(version); };
std::unique_lock lock(mutation_wait_mutex);
if (!mutation_wait_event.wait_for(lock, std::chrono::seconds{timeout}, check))
throw Exception("Mutation " + file_name + " is not finished. Will be done asynchronously", ErrorCodes::UNFINISHED);
}
}
namespace
@ -462,6 +475,17 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
}
bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto data_parts = getDataPartsVector();
for (const auto & data_part : data_parts)
if (data_part->info.getDataVersion() < mutation_version)
return false;
return true;
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_processing_in_background_mutex);
@ -771,6 +795,9 @@ bool StorageMergeTree::tryMutatePart()
renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});
/// Notify all, who wait for this or previous mutations
mutation_wait_event.notify_all();
}
catch (...)
{

View File

@ -79,6 +79,10 @@ public:
private:
/// Mutex and condvar for synchronous mutations wait
std::mutex mutation_wait_mutex;
std::condition_variable mutation_wait_event;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
@ -138,6 +142,8 @@ private:
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
/// Just checks versions of each active data part
bool isMutationDone(Int64 mutation_version) const;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;

View File

@ -54,6 +54,7 @@
#include <thread>
#include <future>
#include <boost/algorithm/string/join.hpp>
namespace ProfileEvents
{
@ -309,6 +310,92 @@ bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper()
}
void StorageReplicatedMergeTree::waitForAllReplicasToStatisfyNodeCondition(
size_t timeout, const String & name_for_logging,
const String & replica_relative_node_path, CheckNodeCallback callback) const
{
const auto operation_start = std::chrono::system_clock::now();
std::chrono::milliseconds total_time{timeout * 1000};
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
std::set<String> inactive_replicas;
std::set<String> timed_out_replicas;
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply " + name_for_logging);
bool operation_is_processed_by_relica = false;
while (!partial_shutdown_called)
{
auto zookeeper = getZooKeeper();
/// Replica could be inactive.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
LOG_WARNING(log, "Replica " << replica << " is not active during mutation query."
<< name_for_logging << " will be done asynchronously when replica becomes active.");
inactive_replicas.emplace(replica);
break;
}
String node_for_check = zookeeper_path + "/replicas/" + replica + "/" + replica_relative_node_path;
std::string node_for_check_value;
Coordination::Stat stat;
/// Replica could be removed
if (!zookeeper->tryGet(node_for_check, node_for_check_value, &stat, wait_event))
{
LOG_WARNING(log, replica << " was removed");
operation_is_processed_by_relica = true;
break;
}
else /// in other case check required node
{
if (callback(node_for_check_value))
{
operation_is_processed_by_relica = true;
break; /// operation is done
}
}
std::chrono::milliseconds time_spent =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - operation_start);
std::chrono::milliseconds time_left = total_time - time_spent;
/// We have some time to wait
if (time_left.count() > 0)
wait_event->tryWait(time_left.count());
else /// Otherwise time is up
break;
}
if (partial_shutdown_called)
throw Exception(name_for_logging + " is not finished because table shutdown was called. " + name_for_logging + " will be done after table restart.",
ErrorCodes::UNFINISHED);
if (!operation_is_processed_by_relica && !inactive_replicas.count(replica))
timed_out_replicas.emplace(replica);
}
if (!inactive_replicas.empty() || !timed_out_replicas.empty())
{
std::stringstream exception_message;
exception_message << name_for_logging << " is not finished because";
if (!inactive_replicas.empty())
exception_message << " some replicas are inactive right now: " << boost::algorithm::join(inactive_replicas, ", ");
if (!timed_out_replicas.empty() && !inactive_replicas.empty())
exception_message << " and";
if (!timed_out_replicas.empty())
exception_message << " timeout when waiting for some replicas: " << boost::algorithm::join(timed_out_replicas, ", ");
exception_message << ". " << name_for_logging << " will be done asynchronously";
throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
}
}
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto zookeeper = getZooKeeper();
@ -3200,6 +3287,7 @@ void StorageReplicatedMergeTree::alter(
int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
};
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
{
@ -3294,6 +3382,10 @@ void StorageReplicatedMergeTree::alter(
time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
/// This code is quite similar with waitForAllReplicasToStatisfyNodeCondition
/// but contains more complicated details (versions manipulations, multiple nodes, etc.).
/// It will be removed soon in favor of alter-modify implementation on top of mutations.
/// TODO (alesap)
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
@ -3396,8 +3488,16 @@ void StorageReplicatedMergeTree::alter(
if (replica_nodes_changed_concurrently)
continue;
/// Now wait for replica nodes to change.
/// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata
/// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata,
/// which is common for all replicas. If changes happen with this nodes (delete, set and create)
/// than event will be notified and wait will be interrupted.
///
/// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and
/// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer*
/// concurrent alter from other replica. First of all it will update shared nodes and we will have no
/// ability to identify, that our *current* alter finshed. So we cannot do anything better than just
/// return from *current* alter with success result.
if (!replication_alter_columns_timeout)
{
alter_query_event->wait();
@ -4399,7 +4499,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
}
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context &)
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
/// Overview of the mutation algorithm.
///
@ -4502,6 +4602,20 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
else
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
if (query_context.getSettingsRef().mutation_synchronous_wait_timeout != 0) /// some timeout specified
{
auto check_callback = [mutation_number = entry.znode_name](const String & zk_value)
{
/// Maybe we already processed more fresh mutation
/// We can compare their znode names (numbers like 0000000000 and 0000000001).
return zk_value >= mutation_number;
};
waitForAllReplicasToStatisfyNodeCondition(
query_context.getSettingsRef().mutation_synchronous_wait_timeout, "Mutation", "mutation_pointer", check_callback);
}
}
std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const

View File

@ -532,6 +532,15 @@ private:
/// return true if it's fixed
bool checkFixedGranualrityInZookeeper();
using CheckNodeCallback = std::function<bool(const String & nodevalue_from_zookeeper)>;
/// Wait for timeout seconds when condition became true for node
/// /replicas/{replica}/replica_replative_node_path value for all replicas.
/// operation_name_for_logging used for logging about errors.
void waitForAllReplicasToStatisfyNodeCondition(
size_t timeout, const String & operaton_name_for_logging,
const String & replica_relative_node_path, CheckNodeCallback condition) const;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/

View File

@ -0,0 +1,10 @@
Replicated
1
1
1
1
Normal
1
1
1
1

View File

@ -0,0 +1,48 @@
DROP TABLE IF EXISTS table_for_synchronous_mutations1;
DROP TABLE IF EXISTS table_for_synchronous_mutations2;
SELECT 'Replicated';
CREATE TABLE table_for_synchronous_mutations1(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/table_for_synchronous_mutations', '1') ORDER BY k PARTITION BY modulo(k, 2);
CREATE TABLE table_for_synchronous_mutations2(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/table_for_synchronous_mutations', '2') ORDER BY k PARTITION BY modulo(k, 2);
INSERT INTO table_for_synchronous_mutations1 select number, number from numbers(100000);
SYSTEM SYNC REPLICA table_for_synchronous_mutations2;
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1';
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341}
-- Another mutation, just to be sure, that previous finished
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1';
DROP TABLE IF EXISTS table_for_synchronous_mutations1;
DROP TABLE IF EXISTS table_for_synchronous_mutations2;
SELECT 'Normal';
DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;
CREATE TABLE table_for_synchronous_mutations_no_replication(k UInt32, v1 UInt64) ENGINE MergeTree ORDER BY k PARTITION BY modulo(k, 2);
INSERT INTO table_for_synchronous_mutations_no_replication select number, number from numbers(100000);
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication';
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341}
-- Another mutation, just to be sure, that previous finished
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication';
DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;