Avoid table does not exist errors if nested is unavailable

This commit is contained in:
kssenii 2021-02-20 21:28:14 +00:00
parent 883cc2c0ef
commit 427aad80a1
4 changed files with 64 additions and 64 deletions

View File

@ -88,6 +88,7 @@ void DatabasePostgreSQLReplica<Base>::startSynchronization()
: (global_context.getSettingsRef().max_insert_block_size.value),
global_context.getMacros()->expand(settings->postgresql_tables_list.value));
/// TODO: may be no need to always fetch
std::unordered_set<std::string> tables_to_replicate = replication_handler->fetchRequiredTables(connection->conn());
for (const auto & table_name : tables_to_replicate)
@ -160,7 +161,9 @@ StoragePtr DatabasePostgreSQLReplica<Base>::tryGetTable(const String & name, con
}
auto table = tables.find(name);
if (table != tables.end() && table->second->as<StoragePostgreSQLReplica>()->isNestedLoaded())
/// Here it is possible that nested table is temporarily out of reach, but return storage anyway,
/// it will not allow to read if nested is unavailable at the moment
if (table != tables.end())
return table->second;
return StoragePtr{};

View File

@ -58,7 +58,6 @@ public:
void shutdown() override;
private:
void startSynchronization();

View File

@ -352,6 +352,7 @@ void StoragePostgreSQLReplica::dropNested()
interpreter.execute();
nested_storage = nullptr;
LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Dropped (or temporarily) nested table {}", getNestedTableName());
}
@ -373,79 +374,78 @@ Pipe StoragePostgreSQLReplica::read(
size_t max_block_size,
unsigned num_streams)
{
/// If initial table sync has not yet finished, nested tables might not be created yet.
/// Or nested table might be attempted to get dropped. (Second mutex lock in dropNested()).
/// TODO: are there other places where this lock is needed
std::unique_lock lock(nested_mutex, std::defer_lock);
if (!nested_loaded || !lock.try_lock())
if (nested_loaded && lock.try_lock())
{
LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Table {} is not loaded yet", getNestedTableName());
return Pipe();
}
if (!nested_storage)
getNested();
/// Should throw if there is no nested storage
if (!nested_storage)
getNested();
auto storage_lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto storage_lock = nested_storage->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock();
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
{
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
if (!tables_in_select_query.children.empty())
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
{
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
if (tables_element.table_expression)
tables_element.table_expression->as<ASTTableExpression &>().final = true;
if (!tables_in_select_query.children.empty())
{
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (tables_element.table_expression)
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}
}
String filter_column_name;
Names require_columns_name = column_names;
ASTPtr expressions = std::make_shared<ASTExpressionList>();
if (column_names_set.empty() || !column_names_set.count(sign_column.name))
{
require_columns_name.emplace_back(sign_column.name);
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(Int8(1)));
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
for (const auto & column_name : column_names)
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipe pipe = nested_storage->read(
require_columns_name,
nested_metadata, query_info, context,
processed_stage, max_block_size, num_streams);
pipe.addTableLock(storage_lock);
if (!expressions->children.empty() && !pipe.empty())
{
Block pipe_header = pipe.getHeader();
auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList());
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true);
pipe.addSimpleTransform([&](const Block & header)
String filter_column_name;
Names require_columns_name = column_names;
ASTPtr expressions = std::make_shared<ASTExpressionList>();
if (column_names_set.empty() || !column_names_set.count(sign_column.name))
{
return std::make_shared<FilterTransform>(header, expression_actions, filter_column_name, false);
});
require_columns_name.emplace_back(sign_column.name);
const auto & sign_column_name = std::make_shared<ASTIdentifier>(sign_column.name);
const auto & fetch_sign_value = std::make_shared<ASTLiteral>(Field(Int8(1)));
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
for (const auto & column_name : column_names)
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipe pipe = nested_storage->read(
require_columns_name,
nested_metadata, query_info, context,
processed_stage, max_block_size, num_streams);
pipe.addTableLock(storage_lock);
if (!expressions->children.empty() && !pipe.empty())
{
Block pipe_header = pipe.getHeader();
auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList());
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header, expression_actions, filter_column_name, false);
});
}
return pipe;
}
return pipe;
LOG_WARNING(&Poco::Logger::get("StoragePostgreSQLReplica"), "Nested table {} is unavailable or is not loaded yet", getNestedTableName());
return Pipe();
}

View File

@ -316,14 +316,12 @@ def test_table_schema_changes(started_cluster):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));
expected = instance.query("SELECT key, value1, value3 FROM test_database.postgresql_replica_3 ORDER BY key");
cursor.execute("ALTER TABLE postgresql_replica_4 DROP COLUMN value2")
cursor.execute("ALTER TABLE postgresql_replica_{} DROP COLUMN value2".format(random.randint(0, 4)))
for i in range(NUM_TABLES):
cursor.execute("INSERT INTO postgresql_replica_{} VALUES (50, {}, {})".format(i, i, i))
cursor.execute("UPDATE postgresql_replica_{} SET value3 = 12 WHERE key%2=0".format(i))
time.sleep(4)
print("Sync check")
for i in range(NUM_TABLES):
check_tables_are_synchronized('postgresql_replica_{}'.format(i));