add log_name_to_exec to dbreplicated

This commit is contained in:
Val 2020-06-27 16:39:41 +03:00
parent e23c7a313e
commit 8273248c4e
9 changed files with 46 additions and 35 deletions

View File

@ -148,8 +148,9 @@ DatabasePtr DatabaseFactory::getImpl(
const auto & arguments = engine->arguments->children;
const auto zoo_path = arguments[0]->as<ASTLiteral>()->value.safeGet<String>();
const auto replica_name = arguments[1]->as<ASTLiteral>()->value.safeGet<String>();
const auto & zoo_path = safeGetLiteralValue<String>(arguments[0], "Replicated");
const auto & replica_name = safeGetLiteralValue<String>(arguments[1], "Replicated");
return std::make_shared<DatabaseReplicated>(database_name, metadata_path, zoo_path, replica_name, context);
}

View File

@ -127,7 +127,7 @@ DatabaseOnDisk::DatabaseOnDisk(
const String & metadata_path_,
const String & data_path_,
const String & logger,
const Context & context)
Context & context)
: DatabaseWithOwnTablesBase(name, logger, context)
, metadata_path(metadata_path_)
, data_path(data_path_)

View File

@ -86,7 +86,6 @@ protected:
const String metadata_path;
const String data_path;
const Context & global_context;
};
}

View File

@ -100,7 +100,7 @@ DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata
}
DatabaseOrdinary::DatabaseOrdinary(
const String & name_, const String & metadata_path_, const String & data_path_, const String & logger, const Context & context_)
const String & name_, const String & metadata_path_, const String & data_path_, const String & logger, Context & context_)
: DatabaseWithDictionaries(name_, metadata_path_, data_path_, logger, context_)
{
}

View File

@ -13,6 +13,8 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/Lock.h>
#include <common/sleep.h>
namespace DB
{
@ -103,13 +105,15 @@ DatabaseReplicated::DatabaseReplicated(
}
snapshot_period = context_.getConfigRef().getInt("database_replicated_snapshot_period", 10);
LOG_DEBUG(log, "Snapshot period is set to " << snapshot_period << " log entries per one snapshot");
LOG_DEBUG(log, "Snapshot period is set to {} log entries per one snapshot", snapshot_period);
background_log_executor = context_.getReplicatedSchedulePool().createTask(database_name + "(DatabaseReplicated::background_executor)", [this]{ runBackgroundLogExecutor();} );
background_log_executor->scheduleAfter(500);
}
DatabaseReplicated::~DatabaseReplicated() = default;
void DatabaseReplicated::createDatabaseZKNodes() {
current_zookeeper = getZooKeeper();
@ -174,7 +178,13 @@ void DatabaseReplicated::runBackgroundLogExecutor() {
for (const String & log_entry_name : log_entry_names) {
String log_entry_path = zookeeper_path + "/log/" + log_entry_name;
executeFromZK(log_entry_path);
bool yield = false;
{
std::lock_guard lock(log_name_mutex);
if (log_name_to_exec_with_result == log_entry_name)
yield = true;
}
executeFromZK(log_entry_path, yield);
last_executed_log_entry = log_entry_name;
writeLastExecutedToDiskAndZK();
@ -203,12 +213,9 @@ void DatabaseReplicated::writeLastExecutedToDiskAndZK() {
out.close();
}
void DatabaseReplicated::executeFromZK(String & path) {
void DatabaseReplicated::executeFromZK(String & path, bool yield) {
current_zookeeper = getZooKeeper();
String query_to_execute = current_zookeeper->get(path, {}, NULL);
//ReadBufferFromString istr(query_to_execute);
//String dummy_string;
//WriteBufferFromString ostr(dummy_string);
try
{
@ -216,23 +223,29 @@ void DatabaseReplicated::executeFromZK(String & path) {
current_context->getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY;
current_context->setCurrentDatabase(database_name);
current_context->setCurrentQueryId(""); // generate random query_id
//executeQuery(istr, ostr, false, *current_context, {});
executeQuery(query_to_execute, *current_context);
}
catch (...)
{
tryLogCurrentException(log, "Query from zookeeper " + query_to_execute + " wasn't finished successfully");
if (yield)
tryLogCurrentException(log, "Query from zookeeper " + query_to_execute + " wasn't finished successfully");
}
LOG_DEBUG(log, "Executed query: " << query_to_execute);
std::lock_guard lock(log_name_mutex);
log_name_to_exec_with_result.clear();
LOG_DEBUG(log, "Executed query: {}", query_to_execute);
}
void DatabaseReplicated::propose(const ASTPtr & query) {
current_zookeeper = getZooKeeper();
LOG_DEBUG(log, "Proposing query: " << queryToString(query));
current_zookeeper->create(zookeeper_path + "/log/log-", queryToString(query), zkutil::CreateMode::PersistentSequential);
LOG_DEBUG(log, "Proposing query: {}", queryToString(query));
{
std::lock_guard lock(log_name_mutex);
log_name_to_exec_with_result = current_zookeeper->create(zookeeper_path + "/log/log-", queryToString(query), zkutil::CreateMode::PersistentSequential);
}
background_log_executor->schedule();
}
@ -241,11 +254,11 @@ void DatabaseReplicated::createSnapshot() {
current_zookeeper = getZooKeeper();
String snapshot_path = zookeeper_path + "/snapshots/" + last_executed_log_entry;
if (Coordination::ZNODEEXISTS == current_zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent)) {
if (Coordination::Error::ZNODEEXISTS == current_zookeeper->tryCreate(snapshot_path, String(), zkutil::CreateMode::Persistent)) {
return;
}
for (auto iterator = getTablesIterator({}); iterator->isValid(); iterator->next()) {
for (auto iterator = getTablesIterator(global_context, {}); iterator->isValid(); iterator->next()) {
String table_name = iterator->name();
auto query = getCreateQueryFromMetadata(getObjectMetadataPath(table_name), true);
String statement = queryToString(query);
@ -262,7 +275,7 @@ void DatabaseReplicated::loadMetadataFromSnapshot() {
current_zookeeper = getZooKeeper();
Strings snapshots;
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots", snapshots) != Coordination::ZOK)
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots", snapshots) != Coordination::Error::ZOK)
return;
auto latest_snapshot = std::max_element(snapshots.begin(), snapshots.end());
@ -277,14 +290,14 @@ void DatabaseReplicated::loadMetadataFromSnapshot() {
Strings metadatas;
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots/" + *latest_snapshot, metadatas) != Coordination::ZOK)
if (current_zookeeper->tryGetChildren(zookeeper_path + "/snapshots/" + *latest_snapshot, metadatas) != Coordination::Error::ZOK)
return;
LOG_DEBUG(log, "Executing " << *latest_snapshot << " snapshot");
LOG_DEBUG(log, "Executing {} snapshot", *latest_snapshot);
for (auto t = metadatas.begin(); t != metadatas.end(); ++t) {
String path = zookeeper_path + "/snapshots/" + *latest_snapshot + "/" + *t;
executeFromZK(path);
executeFromZK(path, false);
}
last_executed_log_entry = *latest_snapshot;

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{
/** DatabaseReplicated engine
@ -33,6 +34,8 @@ class DatabaseReplicated : public DatabaseAtomic
public:
DatabaseReplicated(const String & name_, const String & metadata_path_, const String & zookeeper_path_, const String & replica_name_, Context & context);
~DatabaseReplicated();
void drop(const Context & /*context*/) override;
String getEngineName() const override { return "Replicated"; }
@ -47,7 +50,7 @@ private:
void runBackgroundLogExecutor();
void executeFromZK(String & path);
void executeFromZK(String & path, bool yield);
void writeLastExecutedToDiskAndZK();
@ -57,6 +60,10 @@ private:
std::unique_ptr<Context> current_context; // to run executeQuery
//BlockIO execution_result;
std::mutex log_name_mutex;
String log_name_to_exec_with_result;
int snapshot_period;
String last_executed_log_entry = "";

View File

@ -93,7 +93,7 @@ BlockIO InterpreterDropQuery::executeToTable(
{
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
table->shutdown();
TableStructureWriteLockHolder table_lock;
TableExclusiveLockHolder table_lock;
if (database->getEngineName() != "Atomic" && database->getEngineName() != "Replicated")
table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
/// Drop table from memory, don't touch data and metadata
@ -111,7 +111,6 @@ BlockIO InterpreterDropQuery::executeToTable(
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Drop table data, don't touch metadata
auto table_lock = table->lockExclusively(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) {
database->propose(query_ptr);
} else {

View File

@ -634,14 +634,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
StorageInMemoryMetadata metadata(args.columns, indices_description, args.constraints);
metadata.partition_by_ast = partition_by_ast;
metadata.order_by_ast = order_by_ast;
metadata.primary_key_ast = primary_key_ast;
metadata.ttl_for_table_ast = ttl_table_ast;
metadata.sample_by_ast = sample_by_ast;
metadata.settings_ast = settings_ast;
if (replicatedStorage)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.table_id, args.relative_data_path,

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python2
#-*- coding: utf-8 -*-
import subprocess
import os
@ -105,7 +105,7 @@ if __name__ == "__main__":
bridge_bin=args.bridge_binary,
cfg=args.configs_dir,
pth=args.clickhouse_root,
opts=' '.join(args.pytest_args),
opts='-vv ' + ' '.join(args.pytest_args),
img=DIND_INTEGRATION_TESTS_IMAGE_NAME,
name=CONTAINER_NAME,
command=args.command