Fix error with Lazy database

This commit is contained in:
Alexey Milovidov 2020-06-02 05:06:16 +03:00
parent bdfa552844
commit a6bf4b5d70
14 changed files with 165 additions and 20 deletions

View File

@ -40,6 +40,10 @@ public:
virtual bool isValid() const = 0; virtual bool isValid() const = 0;
virtual const String & name() const = 0; virtual const String & name() const = 0;
/// This method can return nullptr if it's Lazy database
/// (a database with support for lazy tables loading
/// - it maintains a list of tables but tables are loaded lazily).
virtual const StoragePtr & table() const = 0; virtual const StoragePtr & table() const = 0;
virtual ~IDatabaseTablesIterator() = default; virtual ~IDatabaseTablesIterator() = default;

View File

@ -29,7 +29,8 @@ inline void forEachTable(F && f, const Context & context)
{ {
for (auto & elem : DatabaseCatalog::instance().getDatabases()) for (auto & elem : DatabaseCatalog::instance().getDatabases())
for (auto iterator = elem.second->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = elem.second->getTablesIterator(context); iterator->isValid(); iterator->next())
f(iterator->table()); if (auto table = iterator->table())
f(table);
} }

View File

@ -185,6 +185,9 @@ void AsynchronousMetrics::update()
{ {
++total_number_of_tables; ++total_number_of_tables;
const auto & table = iterator->table(); const auto & table = iterator->table();
if (!table)
continue;
StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get());
StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get()); StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get());

View File

@ -376,8 +376,11 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
DatabasePtr & database = elem.second; DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
if (dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get())) if (auto table = iterator->table())
replica_names.emplace_back(StorageID{database->getDatabaseName(), iterator->name()}); {
if (dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
replica_names.emplace_back(StorageID{database->getDatabaseName(), iterator->name()});
}
} }
} }

View File

@ -47,6 +47,9 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
const auto & table = iterator->table(); const auto & table = iterator->table();
if (!table)
continue;
StorageReplicatedMergeTree * table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()); StorageReplicatedMergeTree * table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
if (!table_replicated) if (!table_replicated)

View File

@ -79,7 +79,7 @@ StoragePtr StorageMerge::getFirstTable(F && predicate) const
bool StorageMerge::isRemote() const bool StorageMerge::isRemote() const
{ {
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); }); auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table && table->isRemote(); });
return first_remote_table != nullptr; return first_remote_table != nullptr;
} }
@ -117,7 +117,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
while (iterator->isValid()) while (iterator->isValid())
{ {
const auto & table = iterator->table(); const auto & table = iterator->table();
if (table.get() != this) if (table && table.get() != this)
{ {
++selected_table_size; ++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, to_stage, query_ptr)); stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, to_stage, query_ptr));
@ -316,7 +316,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
while (iterator->isValid()) while (iterator->isValid())
{ {
const auto & table = iterator->table(); const auto & table = iterator->table();
if (table.get() != this) if (table && table.get() != this)
selected_tables.emplace_back( selected_tables.emplace_back(
table, table->lockStructureForShare(false, query_id, settings.lock_acquire_timeout), iterator->name()); table, table->lockStructureForShare(false, query_id, settings.lock_acquire_timeout), iterator->name());
@ -338,6 +338,8 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
while (iterator->isValid()) while (iterator->isValid())
{ {
StoragePtr storage = iterator->table(); StoragePtr storage = iterator->table();
if (!storage)
continue;
if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere()) if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere())
throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE); throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", ErrorCodes::ILLEGAL_PREWHERE);
@ -495,7 +497,7 @@ NamesAndTypesList StorageMerge::getVirtuals() const
{ {
NamesAndTypesList virtuals{{"_table", std::make_shared<DataTypeString>()}}; NamesAndTypesList virtuals{{"_table", std::make_shared<DataTypeString>()}};
auto first_table = getFirstTable([](auto &&) { return true; }); auto first_table = getFirstTable([](auto && table) { return table; });
if (first_table) if (first_table)
{ {
auto table_virtuals = first_table->getVirtuals(); auto table_virtuals = first_table->getVirtuals();

View File

@ -303,12 +303,15 @@ Pipes StorageSystemColumns::read(
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
const String & table_name = iterator->name(); if (const auto & table = iterator->table())
storages.emplace(std::piecewise_construct, {
std::forward_as_tuple(database_name, table_name), const String & table_name = iterator->name();
std::forward_as_tuple(iterator->table())); storages.emplace(std::piecewise_construct,
table_column_mut->insert(table_name); std::forward_as_tuple(database_name, table_name),
++offsets[i]; std::forward_as_tuple(table));
table_column_mut->insert(table_name);
++offsets[i];
}
} }
} }

View File

@ -39,6 +39,8 @@ static StorageSystemGraphite::Configs getConfigs(const Context & context)
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
const auto & table = iterator->table(); const auto & table = iterator->table();
if (!table)
continue;
const MergeTreeData * table_data = dynamic_cast<const MergeTreeData *>(table.get()); const MergeTreeData * table_data = dynamic_cast<const MergeTreeData *>(table.get());
if (!table_data) if (!table_data)

View File

@ -53,13 +53,17 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
if (!dynamic_cast<const MergeTreeData *>(iterator->table().get())) const auto & table = iterator->table();
if (!table)
continue;
if (!dynamic_cast<const MergeTreeData *>(table.get()))
continue; continue;
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
continue; continue;
merge_tree_tables[db.first][iterator->name()] = iterator->table(); merge_tree_tables[db.first][iterator->name()] = table;
} }
} }

View File

@ -115,6 +115,9 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const
{ {
String table_name = iterator->name(); String table_name = iterator->name();
StoragePtr storage = iterator->table(); StoragePtr storage = iterator->table();
if (!storage)
continue;
String engine_name = storage->getName(); String engine_name = storage->getName();
if (!dynamic_cast<MergeTreeData *>(storage.get())) if (!dynamic_cast<MergeTreeData *>(storage.get()))

View File

@ -78,11 +78,15 @@ Pipes StorageSystemReplicas::read(
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
if (!dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get())) const auto & table = iterator->table();
if (!table)
continue;
if (!dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
continue; continue;
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
continue; continue;
replicated_tables[db.first][iterator->name()] = iterator->table(); replicated_tables[db.first][iterator->name()] = table;
} }
} }

View File

@ -62,11 +62,14 @@ void StorageSystemReplicationQueue::fillData(MutableColumns & res_columns, const
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{ {
if (!dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get())) const auto & table = iterator->table();
if (!table)
continue;
if (!dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
continue; continue;
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
continue; continue;
replicated_tables[db.first][iterator->name()] = iterator->table(); replicated_tables[db.first][iterator->name()] = table;
} }
} }

View File

@ -34,7 +34,8 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
auto iterator = database->getTablesIterator(context, table_name_match); auto iterator = database->getTablesIterator(context, table_name_match);
if (iterator->isValid()) if (iterator->isValid())
any_table = iterator->table(); if (const auto & table = iterator->table())
any_table = table;
} }
if (!any_table) if (!any_table)

View File

@ -0,0 +1,109 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
export CURR_DATABASE="test_lazy_01294_concurrent_${CLICKHOUSE_DATABASE}"
function recreate_lazy_func1()
{
$CLICKHOUSE_CLIENT -q "
CREATE TABLE $CURR_DATABASE.log (a UInt64, b UInt64) ENGINE = Log;
";
while true; do
$CLICKHOUSE_CLIENT -q "
DETACH TABLE $CURR_DATABASE.log;
";
$CLICKHOUSE_CLIENT -q "
ATTACH TABLE $CURR_DATABASE.log;
";
done
}
function recreate_lazy_func2()
{
while true; do
$CLICKHOUSE_CLIENT -q "
CREATE TABLE $CURR_DATABASE.tlog (a UInt64, b UInt64) ENGINE = TinyLog;
";
$CLICKHOUSE_CLIENT -q "
DROP TABLE $CURR_DATABASE.tlog;
";
done
}
function recreate_lazy_func3()
{
$CLICKHOUSE_CLIENT -q "
CREATE TABLE $CURR_DATABASE.slog (a UInt64, b UInt64) ENGINE = StripeLog;
";
while true; do
$CLICKHOUSE_CLIENT -q "
ATTACH TABLE $CURR_DATABASE.slog;
";
$CLICKHOUSE_CLIENT -q "
DETACH TABLE $CURR_DATABASE.slog;
";
done
}
function recreate_lazy_func4()
{
while true; do
$CLICKHOUSE_CLIENT -q "
CREATE TABLE $CURR_DATABASE.tlog2 (a UInt64, b UInt64) ENGINE = TinyLog;
";
$CLICKHOUSE_CLIENT -q "
DROP TABLE $CURR_DATABASE.tlog2;
";
done
}
function test_func()
{
while true; do
$CLICKHOUSE_CLIENT -q "SYSTEM STOP TTL MERGES";
done
}
export -f recreate_lazy_func1;
export -f recreate_lazy_func2;
export -f recreate_lazy_func3;
export -f recreate_lazy_func4;
export -f test_func;
${CLICKHOUSE_CLIENT} -n -q "
DROP DATABASE IF EXISTS $CURR_DATABASE;
CREATE DATABASE $CURR_DATABASE ENGINE = Lazy(1);
"
TIMEOUT=30
timeout $TIMEOUT bash -c recreate_lazy_func1 2> /dev/null &
timeout $TIMEOUT bash -c recreate_lazy_func2 2> /dev/null &
timeout $TIMEOUT bash -c recreate_lazy_func3 2> /dev/null &
timeout $TIMEOUT bash -c recreate_lazy_func4 2> /dev/null &
timeout $TIMEOUT bash -c test_func 2> /dev/null &
wait
sleep 1
${CLICKHOUSE_CLIENT} -n -q "
DROP TABLE IF EXISTS $CURR_DATABASE.log;
DROP TABLE IF EXISTS $CURR_DATABASE.slog;
DROP TABLE IF EXISTS $CURR_DATABASE.tlog;
DROP TABLE IF EXISTS $CURR_DATABASE.tlog2;
"
$CLICKHOUSE_CLIENT -q "SYSTEM START TTL MERGES";
echo "Test OK"