change replication algorithm, remove zk lock

In this version of the databaseReplicated sequential persistent zk nodes
are used to order DDL queries. Db replicated ddl queries are executed
in the backgrould pool no matter whether it's proposed by the same
replica or not.
This commit is contained in:
Val 2020-06-07 14:20:05 +03:00
parent 469f9738df
commit f928c897cf
6 changed files with 55 additions and 65 deletions

View File

@ -132,19 +132,34 @@ void DatabaseReplicated::createDatabaseZKNodes() {
void DatabaseReplicated::runBackgroundLogExecutor() {
current_zookeeper = getZooKeeper();
String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL);
size_t last_n_parsed = parse<size_t>(last_n);
Strings log_entry_names = current_zookeeper->getChildren(zookeeper_path + "/log");
bool newEntries = current_log_entry_n < last_n_parsed;
while (current_log_entry_n < last_n_parsed) {
current_log_entry_n++;
String log_path = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n);
executeFromZK(log_path);
}
if (newEntries) {
saveState();
std::sort(log_entry_names.begin(), log_entry_names.end());
auto newest_entry_it = std::upper_bound(log_entry_names.begin(), log_entry_names.end(), last_executed_log_entry);
log_entry_names.erase(log_entry_names.begin(), newest_entry_it);
for (const String & log_entry_name : log_entry_names) {
String log_entry_path = zookeeper_path + "/log/" + log_entry_name;
executeFromZK(log_entry_path);
last_executed_log_entry = log_entry_name;
}
background_log_executor->scheduleAfter(500);
// String last_n = current_zookeeper->get(zookeeper_path + "/last_entry", {}, NULL);
// size_t last_n_parsed = parse<size_t>(last_n);
// bool newEntries = current_log_entry_n < last_n_parsed;
// while (current_log_entry_n < last_n_parsed) {
// current_log_entry_n++;
// String log_path = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n);
// executeFromZK(log_path);
// }
// if (newEntries) {
// saveState();
// }
// background_log_executor->scheduleAfter(500);
}
void DatabaseReplicated::saveState() {
@ -187,53 +202,22 @@ void DatabaseReplicated::executeFromZK(String & path) {
}
// TODO Move to ZooKeeper/Lock and remove it from here and ddlworker
static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
{
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
zookeeper_holder->initFromInstance(zookeeper);
return std::make_unique<zkutil::Lock>(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message);
}
// static std::unique_ptr<zkutil::Lock> createSimpleZooKeeperLock(
// const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
// {
// auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
// zookeeper_holder->initFromInstance(zookeeper);
// return std::make_unique<zkutil::Lock>(std::move(zookeeper_holder), lock_prefix, lock_name, lock_message);
// }
void DatabaseReplicated::propose(const ASTPtr & query) {
// TODO remove that log message i think
LOG_DEBUG(log, "PROPOSING\n" << queryToString(query));
current_zookeeper = getZooKeeper();
auto lock = createSimpleZooKeeperLock(current_zookeeper, zookeeper_path, "propose_lock", replica_name);
while (!lock->tryLock()) {
// TODO it seems that zk lock doesn't work at all
// need to find a different solution for proposal
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::milliseconds(std::uniform_int_distribution<int>(0, 1000)(rng)));
}
LOG_DEBUG(log, "PROPOSINGGG query: " << queryToString(query));
current_zookeeper->create(zookeeper_path + "/log/log-", queryToString(query), zkutil::CreateMode::PersistentSequential);
// schedule and deactive combo
// ensures that replica is up to date
// and since propose lock is acquired,
// no other propose can happen from
// different replicas during this call
background_log_executor->schedule();
background_log_executor->deactivate();
// if (current_log_entry_n > 5) { // make a settings variable
// // TODO check that all the replicas are up to date!
// updateSnapshot();
// current_log_entry_n = 0;
// current_zookeeper->removeChildren(zookeeper_path + "/log");
// }
current_log_entry_n++; // starting from 1
String log_entry = zookeeper_path + "/log/log." + std::to_string(current_log_entry_n);
current_zookeeper->createOrUpdate(log_entry, queryToString(query), zkutil::CreateMode::Persistent);
current_zookeeper->createOrUpdate(zookeeper_path + "/last_entry", std::to_string(current_log_entry_n), zkutil::CreateMode::Persistent);
lock->unlock();
saveState();
background_log_executor->activateAndSchedule();
}
void DatabaseReplicated::updateSnapshot() {

View File

@ -59,6 +59,8 @@ private:
std::atomic<size_t> current_log_entry_n = 0;
String last_executed_log_entry = "";
BackgroundSchedulePool::TaskHolder background_log_executor;
String replica_path;

View File

@ -52,6 +52,7 @@ BlockIO InterpreterAlterQuery::execute()
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY && !table->supportsReplication()) {
database->propose(query_ptr);
return {};
}
/// Add default database to table identifiers that we can encounter in e.g. default expressions,

View File

@ -688,6 +688,11 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
return true;
}
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
return true;
}
StoragePtr res;
/// NOTE: CREATE query may be rewritten by Storage creator or table function
if (create.as_table_function)
@ -707,11 +712,6 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
properties.constraints,
false);
}
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
}
database->createTable(context, table_name, res, query_ptr);
/// We must call "startup" and "shutdown" while holding DDLGuard.

View File

@ -99,8 +99,9 @@ BlockIO InterpreterDropQuery::executeToTable(
/// Drop table from memory, don't touch data and metadata
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
} else {
database->detachTable(table_id.table_name);
}
database->detachTable(table_id.table_name);
}
else if (query.kind == ASTDropQuery::Kind::Truncate)
{
@ -113,8 +114,9 @@ BlockIO InterpreterDropQuery::executeToTable(
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
} else {
table->truncate(query_ptr, metadata_snapshot, context, table_lock);
}
table->truncate(query_ptr, metadata_snapshot, context, table_lock);
}
else if (query.kind == ASTDropQuery::Kind::Drop)
{
@ -129,8 +131,9 @@ BlockIO InterpreterDropQuery::executeToTable(
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
} else {
database->dropTable(context, table_id.table_name, query.no_delay);
}
database->dropTable(context, table_id.table_name, query.no_delay);
}
}

View File

@ -83,15 +83,15 @@ BlockIO InterpreterRenameQuery::execute()
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
} else {
database->renameTable(
context,
elem.from_table_name,
*database_catalog.getDatabase(elem.to_database_name),
elem.to_table_name,
rename.exchange);
}
database->renameTable(
context,
elem.from_table_name,
*database_catalog.getDatabase(elem.to_database_name),
elem.to_table_name,
rename.exchange);
}
return {};
}