Support GENERATED and DEFAULT columns

This commit is contained in:
kssenii 2023-12-06 11:39:04 +01:00
parent f39f0c6a74
commit 806061642a
12 changed files with 471 additions and 207 deletions

View File

@ -20,6 +20,11 @@ namespace ErrorCodes
extern const int UNKNOWN_TYPE;
}
ExternalResultDescription::ExternalResultDescription(const Block & sample_block_)
{
init(sample_block_);
}
void ExternalResultDescription::init(const Block & sample_block_)
{
sample_block = sample_block_;

View File

@ -41,6 +41,9 @@ struct ExternalResultDescription
Block sample_block;
std::vector<std::pair<ValueType, bool /* is_nullable */>> types;
ExternalResultDescription() = default;
explicit ExternalResultDescription(const Block & sample_block_);
void init(const Block & sample_block_);
};

View File

@ -36,7 +36,7 @@ void insertDefaultPostgreSQLValue(IColumn & column, const IColumn & sample_colum
void insertPostgreSQLValue(
IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type,
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx)
const std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx)
{
switch (type)
{
@ -125,8 +125,8 @@ void insertPostgreSQLValue(
pqxx::array_parser parser{value};
std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();
size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions;
const auto parse_value = array_info[idx].pqxx_parser;
size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info.at(idx).num_dimensions;
const auto parse_value = array_info.at(idx).pqxx_parser;
std::vector<Row> dimensions(expected_dimensions + 1);
while (parsed.first != pqxx::array_parser::juncture::done)
@ -138,7 +138,7 @@ void insertPostgreSQLValue(
dimensions[dimension].emplace_back(parse_value(parsed.second));
else if (parsed.first == pqxx::array_parser::juncture::null_value)
dimensions[dimension].emplace_back(array_info[idx].default_value);
dimensions[dimension].emplace_back(array_info.at(idx).default_value);
else if (parsed.first == pqxx::array_parser::juncture::row_end)
{

View File

@ -23,7 +23,7 @@ struct PostgreSQLArrayInfo
void insertPostgreSQLValue(
IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type,
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx);
const std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx);
void preparePostgreSQLArrayInfo(
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type);

View File

@ -186,20 +186,25 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
}
else
{
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string> row;
std::tuple<std::string, std::string, std::string, uint16_t, std::string, std::string, std::string> row;
while (stream >> row)
{
auto data_type = convertPostgreSQLDataType(
const auto column_name = std::get<0>(row);
const auto data_type = convertPostgreSQLDataType(
std::get<1>(row), recheck_array,
use_nulls && (std::get<2>(row) == /* not nullable */"f"),
std::get<3>(row));
columns.push_back(NameAndTypePair(std::get<0>(row), data_type));
columns.push_back(NameAndTypePair(column_name, data_type));
auto attgenerated = std::get<6>(row);
LOG_TEST(&Poco::Logger::get("kssenii"), "KSSENII: attgenerated: {}", attgenerated);
attributes.emplace_back(
attributes.emplace(
column_name,
PostgreSQLTableStructure::PGAttribute{
.atttypid = parse<int>(std::get<4>(row)),
.atttypmod = parse<int>(std::get<5>(row)),
.attgenerated = attgenerated.empty() ? char{} : char(attgenerated[0])
});
++i;
@ -253,14 +258,19 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
PostgreSQLTableStructure table;
auto where = fmt::format("relname = {}", quoteString(postgres_table));
if (postgres_schema.empty())
where += " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')";
else
where += fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));
where += postgres_schema.empty()
? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')"
: fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema));
std::string query = fmt::format(
"SELECT attname AS name, format_type(atttypid, atttypmod) AS type, "
"attnotnull AS not_null, attndims AS dims, atttypid as type_id, atttypmod as type_modifier "
"SELECT attname AS name, " /// column name
"format_type(atttypid, atttypmod) AS type, " /// data type
"attnotnull AS not_null, " /// is nullable
"attndims AS dims, " /// array dimensions
"atttypid as type_id, "
"atttypmod as type_modifier, "
"attgenerated as generated " /// if column has GENERATED
"FROM pg_attribute "
"WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) "
"AND NOT attisdropped AND attnum > 0", where);
@ -271,11 +281,44 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
if (!table.physical_columns)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table_with_schema);
for (const auto & column : table.physical_columns->columns)
{
table.physical_columns->names.push_back(column.name);
}
bool check_generated = table.physical_columns->attributes.end() != std::find_if(
table.physical_columns->attributes.begin(),
table.physical_columns->attributes.end(),
[](const auto & attr){ return attr.second.attgenerated == 's'; });
if (check_generated)
{
std::string attrdef_query = fmt::format(
"SELECT adnum, pg_get_expr(adbin, adrelid) as generated_expression "
"FROM pg_attrdef "
"WHERE adrelid = (SELECT oid FROM pg_class WHERE {});", where);
pqxx::result result{tx.exec(attrdef_query)};
for (const auto row : result)
{
size_t adnum = row[0].as<int>();
if (!adnum || adnum > table.physical_columns->names.size())
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Received adnum {}, but currently fetched columns list has {} columns",
adnum, table.physical_columns->attributes.size());
}
const auto column_name = table.physical_columns->names[adnum - 1];
table.physical_columns->attributes.at(column_name).attr_def = row[1].as<std::string>();
}
}
if (with_primary_key)
{
/// wiki.postgresql.org/wiki/Retrieve_primary_key_columns
query = fmt::format(
"SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type "
"SELECT a.attname, " /// column name
"format_type(a.atttypid, a.atttypmod) AS data_type " /// data type
"FROM pg_index i "
"JOIN pg_attribute a ON a.attrelid = i.indrelid "
"AND a.attnum = ANY(i.indkey) "

View File

@ -16,13 +16,17 @@ struct PostgreSQLTableStructure
{
Int32 atttypid;
Int32 atttypmod;
bool atthasdef;
char attgenerated;
std::string attr_def;
};
using Attributes = std::vector<PGAttribute>;
using Attributes = std::unordered_map<std::string, PGAttribute>;
struct ColumnsInfo
{
NamesAndTypesList columns;
Attributes attributes;
std::vector<std::string> names;
ColumnsInfo(NamesAndTypesList && columns_, Attributes && attributes_) : columns(columns_), attributes(attributes_) {}
};
using ColumnsInfoPtr = std::shared_ptr<ColumnsInfo>;

View File

@ -24,6 +24,22 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace
{
using ArrayInfo = std::unordered_map<size_t, PostgreSQLArrayInfo>;
ArrayInfo createArrayInfos(const NamesAndTypesList & columns, const ExternalResultDescription & columns_description)
{
ArrayInfo array_info;
for (size_t i = 0; i < columns.size(); ++i)
{
if (columns_description.types[i].first == ExternalResultDescription::ValueType::vtArray)
preparePostgreSQLArrayInfo(array_info, i, columns_description.sample_block.getByPosition(i).type);
}
return array_info;
}
}
MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
ContextPtr context_,
std::shared_ptr<postgres::Connection> connection_,
@ -40,126 +56,160 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
, publication_name(publication_name_)
, connection(connection_)
, current_lsn(start_lsn)
, final_lsn(start_lsn)
, lsn_value(getLSNValue(start_lsn))
, max_block_size(max_block_size_)
, schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_)
{
final_lsn = start_lsn;
{
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
current_lsn = advanceLSN(tx);
LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn));
tx->commit();
}
for (const auto & [table_name, storage_info] : storages_info_)
storages.emplace(table_name, storage_info);
storages.emplace(table_name, StorageData(storage_info, log));
LOG_TRACE(log, "Starting replication. LSN: {} (last: {}), storages: {}",
getLSNValue(current_lsn), getLSNValue(final_lsn), storages.size());
}
MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info)
: storage(storage_info.storage), buffer(storage_info.storage->getInMemoryMetadataPtr(), storage_info.attributes)
MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info, Poco::Logger * log_)
: storage(storage_info.storage)
, table_description(storage_info.storage->getInMemoryMetadataPtr()->getSampleBlock())
, columns_attributes(storage_info.attributes)
, array_info(createArrayInfos(storage_info.storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical(), table_description))
{
auto table_id = storage_info.storage->getStorageID();
LOG_TRACE(&Poco::Logger::get("StorageMaterializedPostgreSQL"),
"New buffer for table {}, number of attributes: {}, number if columns: {}, structure: {}",
table_id.getNameForLogs(), buffer.attributes.size(), buffer.getColumnsNum(), buffer.description.sample_block.dumpStructure());
auto columns_num = table_description.sample_block.columns();
/// +2 because of _sign and _version columns
if (columns_attributes.size() + 2 != columns_num)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Columns number mismatch. Attributes: {}, buffer: {}",
columns_attributes.size(), columns_num);
}
LOG_TRACE(log_, "Adding definition for table {}, structure: {}",
storage_info.storage->getStorageID().getNameForLogs(),
table_description.sample_block.dumpStructure());
}
MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_)
: attributes(attributes_)
ColumnsWithTypeAndName && columns_,
const ExternalResultDescription & table_description_)
{
const Block sample_block = storage_metadata->getSampleBlock();
/// Need to clear type, because in description.init() the types are appended
description.types.clear();
description.init(sample_block);
columns = description.sample_block.cloneEmptyColumns();
const auto & storage_columns = storage_metadata->getColumns().getAllPhysical();
auto insert_columns = std::make_shared<ASTExpressionList>();
auto columns_num = description.sample_block.columns();
assert(columns_num == storage_columns.size());
if (attributes.size() + 2 != columns_num) /// +2 because sign and version columns
throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns number mismatch. Attributes: {}, buffer: {}",
attributes.size(), columns_num);
size_t idx = 0;
for (const auto & column : storage_columns)
if (columns_.end() != std::find_if(
columns_.begin(), columns_.end(),
[](const auto & col) { return col.name == "_sign" || col.name == "_version"; }))
{
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
idx++;
insert_columns->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
throw Exception(ErrorCodes::LOGICAL_ERROR,
"PostgreSQL table cannot contain `_sign` or `_version` columns "
"as they are reserved for internal usage");
}
columns_ast = std::move(insert_columns);
columns_.push_back(table_description_.sample_block.getByName("_sign"));
columns_.push_back(table_description_.sample_block.getByName("_version"));
for (const auto & col : columns_)
{
if (!table_description_.sample_block.has(col.name))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Having column {}, but no such column in table ({})",
col.name, table_description_.sample_block.dumpStructure());
}
void MaterializedPostgreSQLConsumer::assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx)
const auto & actual_column = table_description_.sample_block.getByName(col.name);
if (col.type != actual_column.type)
{
if (column_idx >= buffer.description.sample_block.columns()
|| column_idx >= buffer.description.types.size()
|| column_idx >= buffer.columns.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Having column {} of type {}, but expected {}",
col.name, col.type->getName(), actual_column.type->getName());
}
}
sample_block = Block(columns_);
columns = sample_block.cloneEmptyColumns();
for (const auto & name : sample_block.getNames())
columns_ast.children.emplace_back(std::make_shared<ASTIdentifier>(name));
}
MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getBuffer()
{
if (!buffer)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data buffer not initialized for {}",
storage->getStorageID().getNameForLogs());
}
return *buffer;
}
void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible(size_t col_idx) const
{
if (col_idx >= columns.size())
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Attempt to insert into buffer at position: "
"{}, but block columns size is {}, types size: {}, columns size: {}, buffer structure: {}",
column_idx,
buffer.description.sample_block.columns(),
buffer.description.types.size(), buffer.columns.size(),
buffer.description.sample_block.dumpStructure());
"{}, but block columns size is {} (full structure: {})",
col_idx, columns.size(), sample_block.dumpStructure());
}
}
void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx)
void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, const std::string & value, size_t column_idx)
{
assertCorrectInsertion(buffer, column_idx);
auto & buffer = storage_data.getBuffer();
buffer.assertInsertIsPossible(column_idx);
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second;
const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
auto & column = buffer.columns[column_idx];
const size_t column_idx_in_table = storage_data.table_description.sample_block.getPositionByName(column_type_and_name.name);
const auto & type_description = storage_data.table_description.types[column_idx_in_table];
try
{
if (is_nullable)
if (column_type_and_name.type->isNullable())
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*buffer.columns[column_idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*column);
const auto & data_type = assert_cast<const DataTypeNullable &>(*column_type_and_name.type);
insertPostgreSQLValue(
column_nullable.getNestedColumn(), value,
buffer.description.types[column_idx].first, data_type.getNestedType(), buffer.array_info, column_idx);
column_nullable.getNestedColumn(), value, type_description.first,
data_type.getNestedType(), storage_data.array_info, column_idx_in_table);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertPostgreSQLValue(
*buffer.columns[column_idx], value,
buffer.description.types[column_idx].first, sample.type,
buffer.array_info, column_idx);
*column, value, type_description.first, column_type_and_name.type,
storage_data.array_info, column_idx_in_table);
}
}
catch (const pqxx::conversion_error & e)
{
LOG_ERROR(log, "Conversion failed while inserting PostgreSQL value {}, will insert default value. Error: {}", value, e.what());
insertDefaultValue(buffer, column_idx);
LOG_ERROR(log, "Conversion failed while inserting PostgreSQL value {}, "
"will insert default value. Error: {}", value, e.what());
insertDefaultPostgreSQLValue(*column, *column_type_and_name.column);
}
}
void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx)
void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData & storage_data, size_t column_idx)
{
assertCorrectInsertion(buffer, column_idx);
auto & buffer = storage_data.getBuffer();
buffer.assertInsertIsPossible(column_idx);
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column);
const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx);
auto & column = buffer.columns[column_idx];
insertDefaultPostgreSQLValue(*column, *column_type_and_name.column);
}
void MaterializedPostgreSQLConsumer::readString(const char * message, size_t & pos, size_t size, String & result)
{
assert(size > pos + 2);
@ -173,7 +223,6 @@ void MaterializedPostgreSQLConsumer::readString(const char * message, size_t & p
}
}
template<typename T>
T MaterializedPostgreSQLConsumer::unhexN(const char * message, size_t pos, size_t n)
{
@ -186,7 +235,6 @@ T MaterializedPostgreSQLConsumer::unhexN(const char * message, size_t pos, size_
return result;
}
Int64 MaterializedPostgreSQLConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 16);
@ -195,7 +243,6 @@ Int64 MaterializedPostgreSQLConsumer::readInt64(const char * message, size_t & p
return result;
}
Int32 MaterializedPostgreSQLConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 8);
@ -204,7 +251,6 @@ Int32 MaterializedPostgreSQLConsumer::readInt32(const char * message, size_t & p
return result;
}
Int16 MaterializedPostgreSQLConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 4);
@ -213,7 +259,6 @@ Int16 MaterializedPostgreSQLConsumer::readInt16(const char * message, size_t & p
return result;
}
Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 2);
@ -222,25 +267,23 @@ Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos
return result;
}
void MaterializedPostgreSQLConsumer::readTupleData(
StorageData::Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
StorageData & storage_data,
const char * message,
size_t & pos,
size_t size,
PostgreSQLQuery type,
bool old_value)
{
Int16 num_columns = readInt16(message, pos, size);
/// Sanity check. In fact, it was already checked.
if (static_cast<size_t>(num_columns) + 2 != buffer.getColumnsNum()) /// +2 -- sign and version columns
throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR,
"Number of columns does not match. Got: {}, expected {}, current buffer structure: {}",
num_columns, buffer.getColumnsNum(), buffer.description.sample_block.dumpStructure());
auto proccess_column_value = [&](Int8 identifier, Int16 column_idx)
{
switch (identifier) // NOLINT(bugprone-switch-missing-default-case)
{
case 'n': /// NULL
{
insertDefaultValue(buffer, column_idx);
insertDefaultValue(storage_data, column_idx);
break;
}
case 't': /// Text formatted value
@ -250,7 +293,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
for (Int32 i = 0; i < col_len; ++i)
value += readInt8(message, pos, size);
insertValue(buffer, value, column_idx);
insertValue(storage_data, value, column_idx);
break;
}
case 'u': /// TOAST value && unchanged at the same time. Actual value is not sent.
@ -258,13 +301,13 @@ void MaterializedPostgreSQLConsumer::readTupleData(
/// TOAST values are not supported. (TOAST values are values that are considered in postgres
/// to be too large to be stored directly)
LOG_WARNING(log, "Got TOAST value, which is not supported, default value will be used instead.");
insertDefaultValue(buffer, column_idx);
insertDefaultValue(storage_data, column_idx);
break;
}
case 'b': /// Binary data.
{
LOG_WARNING(log, "We do not yet process this format of data, will insert default value");
insertDefaultValue(buffer, column_idx);
insertDefaultValue(storage_data, column_idx);
break;
}
default:
@ -272,7 +315,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
LOG_WARNING(log, "Unexpected identifier: {}. This is a bug! Please report an issue on github", identifier);
chassert(false);
insertDefaultValue(buffer, column_idx);
insertDefaultValue(storage_data, column_idx);
break;
}
}
@ -291,7 +334,7 @@ void MaterializedPostgreSQLConsumer::readTupleData(
"Got error while receiving value for column {}, will insert default value. Error: {}",
column_idx, getCurrentExceptionMessage(true));
insertDefaultValue(buffer, column_idx);
insertDefaultValue(storage_data, column_idx);
/// Let's collect only the first exception.
/// This delaying of error throw is needed because
/// some errors can be ignored and just logged,
@ -301,19 +344,20 @@ void MaterializedPostgreSQLConsumer::readTupleData(
}
}
auto & columns = storage_data.getBuffer().columns;
switch (type)
{
case PostgreSQLQuery::INSERT:
{
buffer.columns[num_columns]->insert(static_cast<Int8>(1));
buffer.columns[num_columns + 1]->insert(lsn_value);
columns[num_columns]->insert(static_cast<Int8>(1));
columns[num_columns + 1]->insert(lsn_value);
break;
}
case PostgreSQLQuery::DELETE:
{
buffer.columns[num_columns]->insert(static_cast<Int8>(-1));
buffer.columns[num_columns + 1]->insert(lsn_value);
columns[num_columns]->insert(static_cast<Int8>(-1));
columns[num_columns + 1]->insert(lsn_value);
break;
}
@ -321,11 +365,11 @@ void MaterializedPostgreSQLConsumer::readTupleData(
{
/// Process old value in case changed value is a primary key.
if (old_value)
buffer.columns[num_columns]->insert(static_cast<Int8>(-1));
columns[num_columns]->insert(static_cast<Int8>(-1));
else
buffer.columns[num_columns]->insert(static_cast<Int8>(1));
columns[num_columns]->insert(static_cast<Int8>(1));
buffer.columns[num_columns + 1]->insert(lsn_value);
columns[num_columns + 1]->insert(lsn_value);
break;
}
@ -335,7 +379,6 @@ void MaterializedPostgreSQLConsumer::readTupleData(
std::rethrow_exception(error);
}
/// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html
void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * replication_message, size_t size)
{
@ -366,10 +409,10 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
return;
Int8 new_tuple = readInt8(replication_message, pos, size);
auto & buffer = storages.find(table_name)->second.buffer;
auto & storage_data = storages.find(table_name)->second;
if (new_tuple)
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::INSERT);
readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::INSERT);
break;
}
@ -386,7 +429,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
if (!isSyncAllowed(relation_id, table_name))
return;
auto & buffer = storages.find(table_name)->second.buffer;
auto & storage_data = storages.find(table_name)->second;
auto proccess_identifier = [&](Int8 identifier) -> bool
{
@ -401,13 +444,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// it is much more efficient to use replica identity index, but support all possible cases.
case 'O':
{
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE, true);
readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::UPDATE, true);
break;
}
case 'N':
{
/// New row.
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE);
readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::UPDATE);
read_next = false;
break;
}
@ -441,8 +484,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
/// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys).
readInt8(replication_message, pos, size);
auto & buffer = storages.find(table_name)->second.buffer;
readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::DELETE);
auto & storage_data = storages.find(table_name)->second;
readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::DELETE);
break;
}
case 'C': // Commit
@ -490,8 +533,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
return;
}
auto & buffer = storage_iter->second.buffer;
/// 'd' - default (primary key if any)
/// 'n' - nothing
/// 'f' - all columns (set replica identity full)
@ -509,29 +550,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
Int16 num_columns = readInt16(replication_message, pos, size);
if (static_cast<size_t>(num_columns) + 2 != buffer.getColumnsNum()) /// +2 -- sign and version columns
{
markTableAsSkipped(relation_id, table_name);
return;
}
if (static_cast<size_t>(num_columns) != buffer.attributes.size())
{
#ifndef NDEBUG
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Mismatch in attributes size. Got {}, expected {}. It's a bug. Current buffer structure: {}",
num_columns, buffer.attributes.size(), buffer.description.sample_block.dumpStructure());
#else
LOG_ERROR(log, "Mismatch in attributes size. Got {}, expected {}. It's a bug. Current buffer structure: {}",
num_columns, buffer.attributes.size(), buffer.description.sample_block.dumpStructure());
markTableAsSkipped(relation_id, table_name);
return;
#endif
}
Int32 data_type_id;
Int32 type_modifier; /// For example, n in varchar(n)
auto & storage_data = storage_iter->second;
const auto & description = storage_data.table_description;
ColumnsWithTypeAndName columns;
for (uint16_t i = 0; i < num_columns; ++i)
{
String column_name;
@ -541,13 +566,22 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
data_type_id = readInt32(replication_message, pos, size);
type_modifier = readInt32(replication_message, pos, size);
if (buffer.attributes[i].atttypid != data_type_id || buffer.attributes[i].atttypmod != type_modifier)
columns.push_back(description.sample_block.getByName(column_name));
const auto & attributes_it = storage_data.columns_attributes.find(column_name);
if (attributes_it == storage_data.columns_attributes.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column: {}", column_name);
const auto & attributes = attributes_it->second;
if (attributes.atttypid != data_type_id || attributes.atttypmod != type_modifier)
{
LOG_TEST(log, "Column {} has a different type", column_name);
markTableAsSkipped(relation_id, table_name);
return;
}
}
storage_data.setBuffer(std::make_unique<StorageData::Buffer>(std::move(columns), description));
tables_to_sync.insert(table_name);
break;
}
@ -563,7 +597,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
}
}
void MaterializedPostgreSQLConsumer::syncTables()
{
size_t synced_tables = 0;
@ -571,8 +604,8 @@ void MaterializedPostgreSQLConsumer::syncTables()
{
auto table_name = *tables_to_sync.begin();
auto & storage_data = storages.find(table_name)->second;
Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
auto & buffer = storage_data.getBuffer();
Block result_rows = buffer.sample_block.cloneWithColumns(std::move(buffer.columns));
try
{
@ -585,7 +618,7 @@ void MaterializedPostgreSQLConsumer::syncTables()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = storage->getStorageID();
insert->columns = storage_data.buffer.columns_ast;
insert->columns = std::make_shared<ASTExpressionList>(buffer.columns_ast);
InterpreterInsertQuery interpreter(insert, insert_context, true);
auto io = interpreter.execute();
@ -603,10 +636,11 @@ void MaterializedPostgreSQLConsumer::syncTables()
catch (...)
{
/// Retry this buffer later.
storage_data.buffer.columns = result_rows.mutateColumns();
buffer.columns = result_rows.mutateColumns();
throw;
}
storage_data.setBuffer(nullptr);
tables_to_sync.erase(tables_to_sync.begin());
}
@ -616,7 +650,6 @@ void MaterializedPostgreSQLConsumer::syncTables()
updateLsn();
}
void MaterializedPostgreSQLConsumer::updateLsn()
{
try
@ -632,7 +665,6 @@ void MaterializedPostgreSQLConsumer::updateLsn()
}
}
String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> tx)
{
std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn);
@ -644,7 +676,6 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontrans
return final_lsn;
}
/// Sync for some table might not be allowed if:
/// 1. Table schema changed and might break synchronization.
/// 2. There is no storage for this table. (As a result of some exception or incorrect pg_publication)
@ -700,7 +731,6 @@ bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id, const Stri
return false;
}
void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const String & relation_name)
{
skip_list.insert({relation_id, ""}); /// Empty lsn string means - continue waiting for valid lsn.
@ -712,12 +742,11 @@ void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
relation_name, relation_id);
}
void MaterializedPostgreSQLConsumer::addNested(
const String & postgres_table_name, StorageInfo nested_storage_info, const String & table_start_lsn)
{
assert(!storages.contains(postgres_table_name));
storages.emplace(postgres_table_name, nested_storage_info);
storages.emplace(postgres_table_name, StorageData(nested_storage_info, log));
auto it = deleted_tables.find(postgres_table_name);
if (it != deleted_tables.end())
@ -728,17 +757,15 @@ void MaterializedPostgreSQLConsumer::addNested(
waiting_list[postgres_table_name] = table_start_lsn;
}
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StorageInfo nested_storage_info, Int32 table_id, const String & table_start_lsn)
{
assert(!storages.contains(table_name));
storages.emplace(table_name, nested_storage_info);
storages.emplace(table_name, StorageData(nested_storage_info, log));
/// Set start position to valid lsn. Before it was an empty string. Further read for table allowed, if it has a valid lsn.
skip_list[table_id] = table_start_lsn;
}
void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_name)
{
auto it = storages.find(postgres_table_name);
@ -747,7 +774,6 @@ void MaterializedPostgreSQLConsumer::removeNested(const String & postgres_table_
deleted_tables.insert(postgres_table_name);
}
void MaterializedPostgreSQLConsumer::setSetting(const SettingChange & setting)
{
if (setting.name == "materialized_postgresql_max_block_size")
@ -756,7 +782,6 @@ void MaterializedPostgreSQLConsumer::setSetting(const SettingChange & setting)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported setting: {}", setting.name);
}
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool MaterializedPostgreSQLConsumer::consume()
{

View File

@ -32,32 +32,36 @@ class MaterializedPostgreSQLConsumer
private:
struct StorageData
{
explicit StorageData(const StorageInfo & storage_info, Poco::Logger * log_);
size_t getColumnsNum() const { return table_description.sample_block.columns(); }
const Block & getSampleBlock() const { return table_description.sample_block; }
using ArrayInfo = std::unordered_map<size_t, PostgreSQLArrayInfo>;
const StoragePtr storage;
const ExternalResultDescription table_description;
const PostgreSQLTableStructure::Attributes columns_attributes;
const ArrayInfo array_info;
struct Buffer
{
ExternalResultDescription description;
Block sample_block;
MutableColumns columns;
ASTExpressionList columns_ast;
/// Needed to pass to insert query columns list in syncTables().
std::shared_ptr<ASTExpressionList> columns_ast;
/// Needed for insertPostgreSQLValue() method to parse array
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
/// To validate ddl.
PostgreSQLTableStructure::Attributes attributes;
explicit Buffer(ColumnsWithTypeAndName && columns_, const ExternalResultDescription & table_description_);
Buffer(StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_);
size_t getColumnsNum() const
{
const auto & sample_block = description.sample_block;
return sample_block.columns();
}
void assertInsertIsPossible(size_t col_idx) const;
};
StoragePtr storage;
Buffer buffer;
Buffer & getBuffer();
explicit StorageData(const StorageInfo & storage_info);
StorageData(const StorageData & other) = delete;
void setBuffer(std::unique_ptr<Buffer> buffer_) { buffer = std::move(buffer_); }
private:
std::unique_ptr<Buffer> buffer;
};
using Storages = std::unordered_map<String, StorageData>;
@ -97,8 +101,8 @@ private:
bool isSyncAllowed(Int32 relation_id, const String & relation_name);
static void insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx);
void insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx);
static void insertDefaultValue(StorageData & storage_data, size_t column_idx);
void insertValue(StorageData & storage_data, const std::string & value, size_t column_idx);
enum class PostgreSQLQuery
{
@ -107,7 +111,7 @@ private:
DELETE
};
void readTupleData(StorageData::Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false);
void readTupleData(StorageData & storage_data, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false);
template<typename T>
static T unhexN(const char * message, size_t pos, size_t n);
@ -119,8 +123,6 @@ private:
void markTableAsSkipped(Int32 relation_id, const String & relation_name);
static void assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx);
/// lsn - log sequence number, like wal offset (64 bit).
static Int64 getLSNValue(const std::string & lsn)
{

View File

@ -337,6 +337,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
dropReplicationSlot(tx);
initial_sync();
LOG_DEBUG(log, "Loaded {} tables", nested_storages.size());
}
/// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's
/// and pass them to replication consumer.
@ -414,16 +415,18 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection
std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name);
tx->exec(query_str);
auto table_structure = fetchTableStructure(*tx, table_name);
if (!table_structure->physical_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes");
auto table_attributes = table_structure->physical_columns->attributes;
/// Load from snapshot, which will show table state before creation of replication slot.
/// Already connected to needed database, no need to add it to query.
auto quoted_name = doubleQuoteWithSchema(table_name);
query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name);
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
auto table_structure = fetchTableStructure(*tx, table_name);
if (!table_structure->physical_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes");
auto table_attributes = table_structure->physical_columns->attributes;
LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name);
auto table_override = tryGetTableOverride(current_database_name, table_name);
materialized_storage->createNestedIfNeeded(std::move(table_structure), table_override ? table_override->as<ASTTableOverride>() : nullptr);
@ -444,12 +447,16 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection
assertBlocksHaveEqualStructure(input->getPort().getHeader(), block_io.pipeline.getHeader(), "postgresql replica load from snapshot");
block_io.pipeline.complete(Pipe(std::move(input)));
/// TODO: make a test when we fail in the middle of inserting data from source.
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
materialized_storage->set(nested_storage);
auto nested_table_id = nested_storage->getStorageID();
LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})",
nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid));
return StorageInfo(nested_storage, std::move(table_attributes));
}

View File

@ -25,6 +25,8 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/formatAST.h>
#include <Interpreters/applyTableOverride.h>
#include <Interpreters/InterpreterDropQuery.h>
@ -195,7 +197,8 @@ void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructur
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure), table_override);
auto table_id = getStorageID();
auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName());
LOG_DEBUG(log, "Creating clickhouse table for postgresql table {}", table_id.getNameForLogs());
LOG_DEBUG(log, "Creating clickhouse table for postgresql table {} (ast: {})",
table_id.getNameForLogs(), serializeAST(*ast_create));
InterpreterCreateQuery interpreter(ast_create, nested_context);
interpreter.execute();
@ -359,7 +362,8 @@ ASTPtr StorageMaterializedPostgreSQL::getColumnDeclaration(const DataTypePtr & d
}
std::shared_ptr<ASTExpressionList> StorageMaterializedPostgreSQL::getColumnsExpressionList(const NamesAndTypesList & columns) const
std::shared_ptr<ASTExpressionList>
StorageMaterializedPostgreSQL::getColumnsExpressionList(const NamesAndTypesList & columns, std::unordered_map<std::string, ASTPtr> defaults) const
{
auto columns_expression_list = std::make_shared<ASTExpressionList>();
for (const auto & [name, type] : columns)
@ -369,6 +373,12 @@ std::shared_ptr<ASTExpressionList> StorageMaterializedPostgreSQL::getColumnsExpr
column_declaration->name = name;
column_declaration->type = getColumnDeclaration(type);
if (auto it = defaults.find(name); it != defaults.end())
{
column_declaration->default_expression = it->second;
column_declaration->default_specifier = "DEFAULT";
}
columns_expression_list->children.emplace_back(column_declaration);
}
return columns_expression_list;
@ -460,8 +470,28 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(
}
else
{
ordinary_columns_and_types = table_structure->physical_columns->columns;
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types));
const auto columns = table_structure->physical_columns;
std::unordered_map<std::string, ASTPtr> defaults;
for (const auto & col : columns->columns)
{
const auto & attr = columns->attributes.at(col.name);
if (!attr.attr_def.empty())
{
ParserExpression expr_parser;
Expected expected;
ASTPtr result;
Tokens tokens(attr.attr_def.data(), attr.attr_def.data() + attr.attr_def.size());
IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH);
if (!expr_parser.parse(pos, result, expected))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to parse default expression: {}", attr.attr_def);
}
defaults.emplace(col.name, result);
}
}
ordinary_columns_and_types = columns->columns;
columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types, defaults));
}
if (ordinary_columns_and_types.empty())

View File

@ -109,7 +109,8 @@ public:
ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure, const ASTTableOverride * table_override);
std::shared_ptr<ASTExpressionList> getColumnsExpressionList(const NamesAndTypesList & columns) const;
std::shared_ptr<ASTExpressionList> getColumnsExpressionList(
const NamesAndTypesList & columns, std::unordered_map<std::string, ASTPtr> defaults = {}) const;
StoragePtr getNested() const;

View File

@ -810,6 +810,150 @@ def test_replica_consumer(started_cluster):
pg_manager_instance2.clear()
def test_replica_consumer(started_cluster):
table = "test_replica_consumer"
pg_manager_instance2.restart()
pg_manager.create_postgres_table(table)
instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)"
)
for pm in [pg_manager, pg_manager_instance2]:
pm.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
"materialized_postgresql_use_unique_replication_consumer_identifier = 1",
],
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_instance2.get_default_database()
)
assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)"
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
check_tables_are_synchronized(
instance2, table, postgres_database=pg_manager_instance2.get_default_database()
)
assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}"))
assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}"))
for pm in [pg_manager, pg_manager_instance2]:
pm.drop_materialized_db()
pg_manager_instance2.clear()
def test_generated_columns(started_cluster):
table = "test_generated_columns"
pg_manager.create_postgres_table(
table,
"",
f"""CREATE TABLE {table} (
key integer PRIMARY KEY,
x integer,
y integer GENERATED ALWAYS AS (x*2) STORED,
z text);
""",
)
pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'1');")
pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'2');")
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
],
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
pg_manager.execute(f"insert into {table} (key, x, z) values (3,3,'3');")
pg_manager.execute(f"insert into {table} (key, x, z) values (4,4,'4');")
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
pg_manager.execute(f"insert into {table} (key, x, z) values (5,5,'5');")
pg_manager.execute(f"insert into {table} (key, x, z) values (6,6,'6');")
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
def test_default_columns(started_cluster):
table = "test_default_columns"
pg_manager.create_postgres_table(
table,
"",
f"""CREATE TABLE {table} (
key integer PRIMARY KEY,
x integer,
y text DEFAULT 'y1',
z integer,
a text DEFAULT 'a1',
b integer);
""",
)
pg_manager.execute(f"insert into {table} (key, x, z, b) values (1,1,1,1);")
pg_manager.execute(f"insert into {table} (key, x, z, b) values (2,2,2,2);")
pg_manager.create_materialized_db(
ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=[
f"materialized_postgresql_tables_list = '{table}'",
"materialized_postgresql_backoff_min_ms = 100",
"materialized_postgresql_backoff_max_ms = 100",
],
)
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
pg_manager.execute(f"insert into {table} (key, x, z, b) values (3,3,3,3);")
pg_manager.execute(f"insert into {table} (key, x, z, b) values (4,4,4,4);")
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
pg_manager.execute(f"insert into {table} (key, x, z, b) values (5,5,5,5);")
pg_manager.execute(f"insert into {table} (key, x, z, b) values (6,6,6,6);")
check_tables_are_synchronized(
instance, table, postgres_database=pg_manager.get_default_database()
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")