This commit is contained in:
Alexander Tokmakov 2020-04-08 04:02:00 +03:00
parent 4c48b7dd80
commit 135197aa75
13 changed files with 67 additions and 50 deletions

View File

@ -54,17 +54,11 @@ namespace ErrorCodes
#endif
#if !defined(__NR_renameat2)
[[noreturn]]
#endif
void renameNoReplace(const std::string & old_path, const std::string & new_path)
{
renameat2(old_path, new_path, RENAME_NOREPLACE);
}
#if !defined(__NR_renameat2)
[[noreturn]]
#endif
void renameExchange(const std::string & old_path, const std::string & new_path)
{
renameat2(old_path, new_path, RENAME_EXCHANGE);

View File

@ -1,13 +1,23 @@
#pragma once
#include <string>
#if defined(_GNU_SOURCE)
#include <sys/syscall.h>
#endif
namespace DB
{
/// Atomically rename old_path to new_path. If new_path exists, do not overwrite it and throw exception
#if !defined(__NR_renameat2)
[[noreturn]]
#endif
void renameNoReplace(const std::string & old_path, const std::string & new_path);
/// Atomically exchange oldpath and newpath. Throw exception if some of them does not exist
#if !defined(__NR_renameat2)
[[noreturn]]
#endif
void renameExchange(const std::string & old_path, const std::string & new_path);
}

View File

@ -394,6 +394,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(SettingDefaultDatabaseEngine, default_database_engine, /*DefaultDatabaseEngine::Ordinary*/ DefaultDatabaseEngine::Atomic, "Default database engine.", 0) \
M(SettingBool, allow_experimental_database_atomic, /*false*/ true, "Allow to create database with Engine=Atomic.", 0) \
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(SettingUInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
class AtomicDatabaseTablesSnapshotIterator final : public DatabaseTablesSnapshotIterator
{
public:
AtomicDatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && base)
explicit AtomicDatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && base)
: DatabaseTablesSnapshotIterator(std::move(base)) {}
UUID uuid() const override { return table()->getStorageID().uuid; }
};

View File

@ -376,6 +376,7 @@ DatabaseCatalog::DatabaseCatalog(Context * global_context_)
{
if (!global_context)
throw Exception("DatabaseCatalog is not initialized. It's a bug.", ErrorCodes::LOGICAL_ERROR);
drop_delay_s = global_context->getConfigRef().getInt("database_atomic_delay_before_drop_table_s", 60);
}
DatabaseCatalog & DatabaseCatalog::init(Context * global_context_)
@ -551,11 +552,11 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
drop_time = Poco::File(dropped_metadata_path).getLastModified().epochTime();
}
std::lock_guard lock(tables_marked_droped_mutex);
std::lock_guard lock(tables_marked_dropped_mutex);
if (ignore_delay)
tables_marked_droped.push_front({table_id, table, dropped_metadata_path, 0});
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, 0});
else
tables_marked_droped.push_back({table_id, table, dropped_metadata_path, drop_time});
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time});
}
void DatabaseCatalog::dropTableDataTask()
@ -563,19 +564,19 @@ void DatabaseCatalog::dropTableDataTask()
TableMarkedAsDropped table;
try
{
std::lock_guard lock(tables_marked_droped_mutex);
std::lock_guard lock(tables_marked_dropped_mutex);
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto it = std::find_if(tables_marked_droped.begin(), tables_marked_droped.end(), [current_time](const auto & elem)
auto it = std::find_if(tables_marked_dropped.begin(), tables_marked_dropped.end(), [&](const auto & elem)
{
bool not_in_use = !elem.table || elem.table.unique();
bool old_enough = elem.drop_time + drop_delay_s < current_time;
return not_in_use && old_enough;
});
if (it != tables_marked_droped.end())
if (it != tables_marked_dropped.end())
{
table = std::move(*it);
LOG_INFO(log, "Will try drop " + table.table_id.getNameForLogs());
tables_marked_droped.erase(it);
tables_marked_dropped.erase(it);
}
}
catch (...)
@ -595,8 +596,8 @@ void DatabaseCatalog::dropTableDataTask()
tryLogCurrentException(log, "Cannot drop table " + table.table_id.getNameForLogs() +
". Will retry later.");
{
std::lock_guard lock(tables_marked_droped_mutex);
tables_marked_droped.emplace_back(std::move(table));
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.emplace_back(std::move(table));
}
}
}

View File

@ -194,7 +194,6 @@ private:
void dropTableDataTask();
void dropTableFinally(const TableMarkedAsDropped & table) const;
static constexpr time_t drop_delay_s = 10;
static constexpr size_t reschedule_time_ms = 100;
private:
@ -219,10 +218,11 @@ private:
/// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.
mutable std::mutex ddl_guards_mutex;
TablesMarkedAsDropped tables_marked_droped;
mutable std::mutex tables_marked_droped_mutex;
TablesMarkedAsDropped tables_marked_dropped;
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;
time_t drop_delay_s;
};
}

View File

@ -102,6 +102,10 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
engine->name = old_style_database ? "Ordinary" : "Atomic";
storage->set(storage->engine, engine);
create.set(create.storage, storage);
if (!context.getSettingsRef().allow_experimental_database_atomic)
throw Exception("Atomic is an experimental database engine. Enable allow_experimental_database_atomic to use it.",
ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
else if ((create.columns_list && create.columns_list->indices && !create.columns_list->indices->children.empty()))
{

View File

@ -26,8 +26,7 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
else
settings.ostr << (settings.hilite ? hilite_none : "")
<< (!table_id.database_name.empty() ? backQuoteIfNeed(table_id.database_name) + "." : "") << backQuoteIfNeed(table_id.table_name)
<< (table_id.hasUUID() ? " UUID " : "") << (table_id.hasUUID() ? quoteString(toString(table_id.uuid)) : "");
<< (!table_id.database_name.empty() ? backQuoteIfNeed(table_id.database_name) + "." : "") << backQuoteIfNeed(table_id.table_name);
if (columns)
{

View File

@ -330,30 +330,33 @@ void StorageMaterializedView::mutate(const MutationCommands & commands, const Co
void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)
{
auto old_table_id = getStorageID();
auto old_target_table_name = generateInnerTableName(old_table_id);
auto new_target_table_name = generateInnerTableName(new_table_id);
bool inner_same_name = old_table_id.database_name == new_table_id.database_name &&
old_target_table_name == new_target_table_name;
if (has_inner_table && tryGetTargetTable() && !inner_same_name)
if (has_inner_table && tryGetTargetTable())
{
auto rename = std::make_shared<ASTRenameQuery>();
auto old_target_table_name = generateInnerTableName(old_table_id);
auto new_target_table_name = generateInnerTableName(new_table_id);
if (old_table_id.database_name != new_table_id.database_name ||
old_target_table_name != new_target_table_name)
{
ASTRenameQuery::Table from;
from.database = target_table_id.database_name;
from.table = target_table_id.table_name;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table to;
to.database = target_table_id.database_name;
to.table = new_target_table_name;
ASTRenameQuery::Table from;
from.database = target_table_id.database_name;
from.table = target_table_id.table_name;
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
ASTRenameQuery::Table to;
to.database = target_table_id.database_name;
to.table = new_target_table_name;
InterpreterRenameQuery(rename, global_context).execute();
target_table_id.table_name = new_target_table_name;
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
InterpreterRenameQuery(rename, global_context).execute();
target_table_id.table_name = new_target_table_name;
}
}
IStorage::renameInMemory(new_table_id);

View File

@ -24,7 +24,8 @@ def started_cluster():
def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {}".format(table_name))
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
time.sleep(1)
def test_ttl_columns(started_cluster):
drop_table([node1, node2], "test_ttl")

View File

@ -21,7 +21,7 @@ ${CLICKHOUSE_CLIENT} -n -q "
timeout 10s ${CLICKHOUSE_CLIENT} -n -q "
SET receive_timeout=1;
SYSTEM SYNC REPLICA $R2
" 2>&1 | fgrep -q "DB::Exception: SYNC REPLICA ${CLICKHOUSE_DATABASE}.$R2: command timed out!" && echo 'OK' || echo 'Failed!'
" 2>&1 | fgrep -q "Code: 159. DB::Exception" && echo 'OK' || echo 'Failed!'
# By dropping tables all related SYNC REPLICA queries would be terminated as well
${CLICKHOUSE_CLIENT} -n -q "

View File

@ -22,8 +22,9 @@ function query_with_retry
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.src NO DELAY;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.dst NO DELAY;"
sleep 1
$CLICKHOUSE_CLIENT --query="CREATE TABLE $CLICKHOUSE_DATABASE.src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_DATABASE/src', '1') PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE $CLICKHOUSE_DATABASE.dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_DATABASE/dst', '1') PARTITION BY p ORDER BY k SETTINGS old_parts_lifetime=1, cleanup_delay_period=1, cleanup_delay_period_random_add=0;"
@ -50,8 +51,9 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE $CLICKHOUSE_DATABASE.src NO DELAY;"
$CLICKHOUSE_CLIENT --query="DROP TABLE $CLICKHOUSE_DATABASE.dst NO DELAY;"
sleep 1
$CLICKHOUSE_CLIENT --query="SELECT 'MOVE incompatible schema missing column';"
@ -69,8 +71,9 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE $CLICKHOUSE_DATABASE.src NO DELAY;"
$CLICKHOUSE_CLIENT --query="DROP TABLE $CLICKHOUSE_DATABASE.dst NO DELAY;"
sleep 1
$CLICKHOUSE_CLIENT --query="SELECT 'MOVE incompatible schema different order by';"
@ -89,6 +92,7 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS $CLICKHOUSE_DATABASE.dst;"
$CLICKHOUSE_CLIENT --query="DROP TABLE $CLICKHOUSE_DATABASE.src NO DELAY;"
$CLICKHOUSE_CLIENT --query="DROP TABLE $CLICKHOUSE_DATABASE.dst NO DELAY;"
sleep 1

View File

@ -60,9 +60,9 @@ wait
echo "DROP TABLE $CLICKHOUSE_DATABASE.src NO DELAY" | ${CLICKHOUSE_CLIENT}
echo "DROP TABLE $CLICKHOUSE_DATABASE.dst NO DELAY" | ${CLICKHOUSE_CLIENT}
sleep 5
# Check for deadlocks
echo "SELECT * FROM system.processes WHERE query_id LIKE 'query%'" | ${CLICKHOUSE_CLIENT}
echo 'did not crash'
sleep 1