diff --git a/src/Core/ExternalResultDescription.cpp b/src/Core/ExternalResultDescription.cpp index 0700200a9ec..f7e8a69d355 100644 --- a/src/Core/ExternalResultDescription.cpp +++ b/src/Core/ExternalResultDescription.cpp @@ -20,6 +20,11 @@ namespace ErrorCodes extern const int UNKNOWN_TYPE; } +ExternalResultDescription::ExternalResultDescription(const Block & sample_block_) +{ + init(sample_block_); +} + void ExternalResultDescription::init(const Block & sample_block_) { sample_block = sample_block_; diff --git a/src/Core/ExternalResultDescription.h b/src/Core/ExternalResultDescription.h index a9ffe8b2ed2..b7d852b99cf 100644 --- a/src/Core/ExternalResultDescription.h +++ b/src/Core/ExternalResultDescription.h @@ -41,6 +41,9 @@ struct ExternalResultDescription Block sample_block; std::vector> types; + ExternalResultDescription() = default; + explicit ExternalResultDescription(const Block & sample_block_); + void init(const Block & sample_block_); }; diff --git a/src/Core/PostgreSQL/insertPostgreSQLValue.cpp b/src/Core/PostgreSQL/insertPostgreSQLValue.cpp index d2e8071c5de..2f041134f06 100644 --- a/src/Core/PostgreSQL/insertPostgreSQLValue.cpp +++ b/src/Core/PostgreSQL/insertPostgreSQLValue.cpp @@ -36,7 +36,7 @@ void insertDefaultPostgreSQLValue(IColumn & column, const IColumn & sample_colum void insertPostgreSQLValue( IColumn & column, std::string_view value, const ExternalResultDescription::ValueType type, const DataTypePtr data_type, - std::unordered_map & array_info, size_t idx) + const std::unordered_map & array_info, size_t idx) { switch (type) { @@ -125,8 +125,8 @@ void insertPostgreSQLValue( pqxx::array_parser parser{value}; std::pair parsed = parser.get_next(); - size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions; - const auto parse_value = array_info[idx].pqxx_parser; + size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info.at(idx).num_dimensions; + const auto parse_value = array_info.at(idx).pqxx_parser; std::vector dimensions(expected_dimensions + 1); while (parsed.first != pqxx::array_parser::juncture::done) @@ -138,7 +138,7 @@ void insertPostgreSQLValue( dimensions[dimension].emplace_back(parse_value(parsed.second)); else if (parsed.first == pqxx::array_parser::juncture::null_value) - dimensions[dimension].emplace_back(array_info[idx].default_value); + dimensions[dimension].emplace_back(array_info.at(idx).default_value); else if (parsed.first == pqxx::array_parser::juncture::row_end) { diff --git a/src/Core/PostgreSQL/insertPostgreSQLValue.h b/src/Core/PostgreSQL/insertPostgreSQLValue.h index b842d86ed47..3bc83292b96 100644 --- a/src/Core/PostgreSQL/insertPostgreSQLValue.h +++ b/src/Core/PostgreSQL/insertPostgreSQLValue.h @@ -23,7 +23,7 @@ struct PostgreSQLArrayInfo void insertPostgreSQLValue( IColumn & column, std::string_view value, const ExternalResultDescription::ValueType type, const DataTypePtr data_type, - std::unordered_map & array_info, size_t idx); + const std::unordered_map & array_info, size_t idx); void preparePostgreSQLArrayInfo( std::unordered_map & array_info, size_t column_idx, const DataTypePtr data_type); diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp index 2c97c92ba99..527936f1c19 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.cpp @@ -186,20 +186,25 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList( } else { - std::tuple row; + std::tuple row; while (stream >> row) { - auto data_type = convertPostgreSQLDataType( + const auto column_name = std::get<0>(row); + const auto data_type = convertPostgreSQLDataType( std::get<1>(row), recheck_array, use_nulls && (std::get<2>(row) == /* not nullable */"f"), std::get<3>(row)); - columns.push_back(NameAndTypePair(std::get<0>(row), data_type)); + columns.push_back(NameAndTypePair(column_name, data_type)); + auto attgenerated = std::get<6>(row); + LOG_TEST(&Poco::Logger::get("kssenii"), "KSSENII: attgenerated: {}", attgenerated); - attributes.emplace_back( - PostgreSQLTableStructure::PGAttribute{ - .atttypid = parse(std::get<4>(row)), - .atttypmod = parse(std::get<5>(row)), + attributes.emplace( + column_name, + PostgreSQLTableStructure::PGAttribute{ + .atttypid = parse(std::get<4>(row)), + .atttypmod = parse(std::get<5>(row)), + .attgenerated = attgenerated.empty() ? char{} : char(attgenerated[0]) }); ++i; @@ -253,14 +258,19 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( PostgreSQLTableStructure table; auto where = fmt::format("relname = {}", quoteString(postgres_table)); - if (postgres_schema.empty()) - where += " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')"; - else - where += fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema)); + + where += postgres_schema.empty() + ? " AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')" + : fmt::format(" AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = {})", quoteString(postgres_schema)); std::string query = fmt::format( - "SELECT attname AS name, format_type(atttypid, atttypmod) AS type, " - "attnotnull AS not_null, attndims AS dims, atttypid as type_id, atttypmod as type_modifier " + "SELECT attname AS name, " /// column name + "format_type(atttypid, atttypmod) AS type, " /// data type + "attnotnull AS not_null, " /// is nullable + "attndims AS dims, " /// array dimensions + "atttypid as type_id, " + "atttypmod as type_modifier, " + "attgenerated as generated " /// if column has GENERATED "FROM pg_attribute " "WHERE attrelid = (SELECT oid FROM pg_class WHERE {}) " "AND NOT attisdropped AND attnum > 0", where); @@ -271,11 +281,44 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure( if (!table.physical_columns) throw Exception(ErrorCodes::UNKNOWN_TABLE, "PostgreSQL table {} does not exist", postgres_table_with_schema); + for (const auto & column : table.physical_columns->columns) + { + table.physical_columns->names.push_back(column.name); + } + + bool check_generated = table.physical_columns->attributes.end() != std::find_if( + table.physical_columns->attributes.begin(), + table.physical_columns->attributes.end(), + [](const auto & attr){ return attr.second.attgenerated == 's'; }); + + if (check_generated) + { + std::string attrdef_query = fmt::format( + "SELECT adnum, pg_get_expr(adbin, adrelid) as generated_expression " + "FROM pg_attrdef " + "WHERE adrelid = (SELECT oid FROM pg_class WHERE {});", where); + + pqxx::result result{tx.exec(attrdef_query)}; + for (const auto row : result) + { + size_t adnum = row[0].as(); + if (!adnum || adnum > table.physical_columns->names.size()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Received adnum {}, but currently fetched columns list has {} columns", + adnum, table.physical_columns->attributes.size()); + } + const auto column_name = table.physical_columns->names[adnum - 1]; + table.physical_columns->attributes.at(column_name).attr_def = row[1].as(); + } + } + if (with_primary_key) { /// wiki.postgresql.org/wiki/Retrieve_primary_key_columns query = fmt::format( - "SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type " + "SELECT a.attname, " /// column name + "format_type(a.atttypid, a.atttypmod) AS data_type " /// data type "FROM pg_index i " "JOIN pg_attribute a ON a.attrelid = i.indrelid " "AND a.attnum = ANY(i.indkey) " diff --git a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h index 7cd21d353a2..81bf7b278fc 100644 --- a/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h +++ b/src/Databases/PostgreSQL/fetchPostgreSQLTableStructure.h @@ -16,13 +16,17 @@ struct PostgreSQLTableStructure { Int32 atttypid; Int32 atttypmod; + bool atthasdef; + char attgenerated; + std::string attr_def; }; - using Attributes = std::vector; + using Attributes = std::unordered_map; struct ColumnsInfo { NamesAndTypesList columns; Attributes attributes; + std::vector names; ColumnsInfo(NamesAndTypesList && columns_, Attributes && attributes_) : columns(columns_), attributes(attributes_) {} }; using ColumnsInfoPtr = std::shared_ptr; diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index e7bd6be7b2b..6be1563d16c 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -24,6 +24,22 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace +{ + using ArrayInfo = std::unordered_map; + + ArrayInfo createArrayInfos(const NamesAndTypesList & columns, const ExternalResultDescription & columns_description) + { + ArrayInfo array_info; + for (size_t i = 0; i < columns.size(); ++i) + { + if (columns_description.types[i].first == ExternalResultDescription::ValueType::vtArray) + preparePostgreSQLArrayInfo(array_info, i, columns_description.sample_block.getByPosition(i).type); + } + return array_info; + } +} + MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( ContextPtr context_, std::shared_ptr connection_, @@ -40,126 +56,160 @@ MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer( , publication_name(publication_name_) , connection(connection_) , current_lsn(start_lsn) + , final_lsn(start_lsn) , lsn_value(getLSNValue(start_lsn)) , max_block_size(max_block_size_) , schema_as_a_part_of_table_name(schema_as_a_part_of_table_name_) { - final_lsn = start_lsn; - auto tx = std::make_shared(connection->getRef()); - current_lsn = advanceLSN(tx); - LOG_TRACE(log, "Starting replication. LSN: {} (last: {})", getLSNValue(current_lsn), getLSNValue(final_lsn)); - tx->commit(); - - for (const auto & [table_name, storage_info] : storages_info_) - storages.emplace(table_name, storage_info); -} - - -MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info) - : storage(storage_info.storage), buffer(storage_info.storage->getInMemoryMetadataPtr(), storage_info.attributes) -{ - auto table_id = storage_info.storage->getStorageID(); - LOG_TRACE(&Poco::Logger::get("StorageMaterializedPostgreSQL"), - "New buffer for table {}, number of attributes: {}, number if columns: {}, structure: {}", - table_id.getNameForLogs(), buffer.attributes.size(), buffer.getColumnsNum(), buffer.description.sample_block.dumpStructure()); -} - - -MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer( - StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_) - : attributes(attributes_) -{ - const Block sample_block = storage_metadata->getSampleBlock(); - - /// Need to clear type, because in description.init() the types are appended - description.types.clear(); - description.init(sample_block); - - columns = description.sample_block.cloneEmptyColumns(); - const auto & storage_columns = storage_metadata->getColumns().getAllPhysical(); - auto insert_columns = std::make_shared(); - - auto columns_num = description.sample_block.columns(); - assert(columns_num == storage_columns.size()); - if (attributes.size() + 2 != columns_num) /// +2 because sign and version columns - throw Exception(ErrorCodes::LOGICAL_ERROR, "Columns number mismatch. Attributes: {}, buffer: {}", - attributes.size(), columns_num); - - size_t idx = 0; - for (const auto & column : storage_columns) { - if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) - preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type); - idx++; - - insert_columns->children.emplace_back(std::make_shared(column.name)); + auto tx = std::make_shared(connection->getRef()); + current_lsn = advanceLSN(tx); + tx->commit(); } - columns_ast = std::move(insert_columns); + for (const auto & [table_name, storage_info] : storages_info_) + storages.emplace(table_name, StorageData(storage_info, log)); + + LOG_TRACE(log, "Starting replication. LSN: {} (last: {}), storages: {}", + getLSNValue(current_lsn), getLSNValue(final_lsn), storages.size()); } -void MaterializedPostgreSQLConsumer::assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx) +MaterializedPostgreSQLConsumer::StorageData::StorageData(const StorageInfo & storage_info, Poco::Logger * log_) + : storage(storage_info.storage) + , table_description(storage_info.storage->getInMemoryMetadataPtr()->getSampleBlock()) + , columns_attributes(storage_info.attributes) + , array_info(createArrayInfos(storage_info.storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical(), table_description)) { - if (column_idx >= buffer.description.sample_block.columns() - || column_idx >= buffer.description.types.size() - || column_idx >= buffer.columns.size()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, + auto columns_num = table_description.sample_block.columns(); + /// +2 because of _sign and _version columns + if (columns_attributes.size() + 2 != columns_num) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Columns number mismatch. Attributes: {}, buffer: {}", + columns_attributes.size(), columns_num); + } + + LOG_TRACE(log_, "Adding definition for table {}, structure: {}", + storage_info.storage->getStorageID().getNameForLogs(), + table_description.sample_block.dumpStructure()); +} + +MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer( + ColumnsWithTypeAndName && columns_, + const ExternalResultDescription & table_description_) +{ + if (columns_.end() != std::find_if( + columns_.begin(), columns_.end(), + [](const auto & col) { return col.name == "_sign" || col.name == "_version"; })) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "PostgreSQL table cannot contain `_sign` or `_version` columns " + "as they are reserved for internal usage"); + } + + columns_.push_back(table_description_.sample_block.getByName("_sign")); + columns_.push_back(table_description_.sample_block.getByName("_version")); + + for (const auto & col : columns_) + { + if (!table_description_.sample_block.has(col.name)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Having column {}, but no such column in table ({})", + col.name, table_description_.sample_block.dumpStructure()); + } + + const auto & actual_column = table_description_.sample_block.getByName(col.name); + if (col.type != actual_column.type) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Having column {} of type {}, but expected {}", + col.name, col.type->getName(), actual_column.type->getName()); + } + } + + sample_block = Block(columns_); + columns = sample_block.cloneEmptyColumns(); + + for (const auto & name : sample_block.getNames()) + columns_ast.children.emplace_back(std::make_shared(name)); +} + +MaterializedPostgreSQLConsumer::StorageData::Buffer & MaterializedPostgreSQLConsumer::StorageData::getBuffer() +{ + if (!buffer) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Data buffer not initialized for {}", + storage->getStorageID().getNameForLogs()); + } + + return *buffer; +} + +void MaterializedPostgreSQLConsumer::StorageData::Buffer::assertInsertIsPossible(size_t col_idx) const +{ + if (col_idx >= columns.size()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to insert into buffer at position: " - "{}, but block columns size is {}, types size: {}, columns size: {}, buffer structure: {}", - column_idx, - buffer.description.sample_block.columns(), - buffer.description.types.size(), buffer.columns.size(), - buffer.description.sample_block.dumpStructure()); + "{}, but block columns size is {} (full structure: {})", + col_idx, columns.size(), sample_block.dumpStructure()); + } } -void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx) +void MaterializedPostgreSQLConsumer::insertValue(StorageData & storage_data, const std::string & value, size_t column_idx) { - assertCorrectInsertion(buffer, column_idx); + auto & buffer = storage_data.getBuffer(); + buffer.assertInsertIsPossible(column_idx); - const auto & sample = buffer.description.sample_block.getByPosition(column_idx); - bool is_nullable = buffer.description.types[column_idx].second; + const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx); + auto & column = buffer.columns[column_idx]; + + const size_t column_idx_in_table = storage_data.table_description.sample_block.getPositionByName(column_type_and_name.name); + const auto & type_description = storage_data.table_description.types[column_idx_in_table]; try { - if (is_nullable) + if (column_type_and_name.type->isNullable()) { - ColumnNullable & column_nullable = assert_cast(*buffer.columns[column_idx]); - const auto & data_type = assert_cast(*sample.type); + ColumnNullable & column_nullable = assert_cast(*column); + const auto & data_type = assert_cast(*column_type_and_name.type); insertPostgreSQLValue( - column_nullable.getNestedColumn(), value, - buffer.description.types[column_idx].first, data_type.getNestedType(), buffer.array_info, column_idx); + column_nullable.getNestedColumn(), value, type_description.first, + data_type.getNestedType(), storage_data.array_info, column_idx_in_table); column_nullable.getNullMapData().emplace_back(0); } else { insertPostgreSQLValue( - *buffer.columns[column_idx], value, - buffer.description.types[column_idx].first, sample.type, - buffer.array_info, column_idx); + *column, value, type_description.first, column_type_and_name.type, + storage_data.array_info, column_idx_in_table); } } catch (const pqxx::conversion_error & e) { - LOG_ERROR(log, "Conversion failed while inserting PostgreSQL value {}, will insert default value. Error: {}", value, e.what()); - insertDefaultValue(buffer, column_idx); + LOG_ERROR(log, "Conversion failed while inserting PostgreSQL value {}, " + "will insert default value. Error: {}", value, e.what()); + + insertDefaultPostgreSQLValue(*column, *column_type_and_name.column); } } - -void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx) +void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData & storage_data, size_t column_idx) { - assertCorrectInsertion(buffer, column_idx); + auto & buffer = storage_data.getBuffer(); + buffer.assertInsertIsPossible(column_idx); - const auto & sample = buffer.description.sample_block.getByPosition(column_idx); - insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column); + const auto & column_type_and_name = buffer.sample_block.getByPosition(column_idx); + auto & column = buffer.columns[column_idx]; + + insertDefaultPostgreSQLValue(*column, *column_type_and_name.column); } - void MaterializedPostgreSQLConsumer::readString(const char * message, size_t & pos, size_t size, String & result) { assert(size > pos + 2); @@ -173,7 +223,6 @@ void MaterializedPostgreSQLConsumer::readString(const char * message, size_t & p } } - template T MaterializedPostgreSQLConsumer::unhexN(const char * message, size_t pos, size_t n) { @@ -186,7 +235,6 @@ T MaterializedPostgreSQLConsumer::unhexN(const char * message, size_t pos, size_ return result; } - Int64 MaterializedPostgreSQLConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size >= pos + 16); @@ -195,7 +243,6 @@ Int64 MaterializedPostgreSQLConsumer::readInt64(const char * message, size_t & p return result; } - Int32 MaterializedPostgreSQLConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size >= pos + 8); @@ -204,7 +251,6 @@ Int32 MaterializedPostgreSQLConsumer::readInt32(const char * message, size_t & p return result; } - Int16 MaterializedPostgreSQLConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size >= pos + 4); @@ -213,7 +259,6 @@ Int16 MaterializedPostgreSQLConsumer::readInt16(const char * message, size_t & p return result; } - Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size) { assert(size >= pos + 2); @@ -222,25 +267,23 @@ Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos return result; } - void MaterializedPostgreSQLConsumer::readTupleData( - StorageData::Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value) + StorageData & storage_data, + const char * message, + size_t & pos, + size_t size, + PostgreSQLQuery type, + bool old_value) { Int16 num_columns = readInt16(message, pos, size); - /// Sanity check. In fact, it was already checked. - if (static_cast(num_columns) + 2 != buffer.getColumnsNum()) /// +2 -- sign and version columns - throw Exception(ErrorCodes::POSTGRESQL_REPLICATION_INTERNAL_ERROR, - "Number of columns does not match. Got: {}, expected {}, current buffer structure: {}", - num_columns, buffer.getColumnsNum(), buffer.description.sample_block.dumpStructure()); - auto proccess_column_value = [&](Int8 identifier, Int16 column_idx) { switch (identifier) // NOLINT(bugprone-switch-missing-default-case) { case 'n': /// NULL { - insertDefaultValue(buffer, column_idx); + insertDefaultValue(storage_data, column_idx); break; } case 't': /// Text formatted value @@ -250,7 +293,7 @@ void MaterializedPostgreSQLConsumer::readTupleData( for (Int32 i = 0; i < col_len; ++i) value += readInt8(message, pos, size); - insertValue(buffer, value, column_idx); + insertValue(storage_data, value, column_idx); break; } case 'u': /// TOAST value && unchanged at the same time. Actual value is not sent. @@ -258,13 +301,13 @@ void MaterializedPostgreSQLConsumer::readTupleData( /// TOAST values are not supported. (TOAST values are values that are considered in postgres /// to be too large to be stored directly) LOG_WARNING(log, "Got TOAST value, which is not supported, default value will be used instead."); - insertDefaultValue(buffer, column_idx); + insertDefaultValue(storage_data, column_idx); break; } case 'b': /// Binary data. { LOG_WARNING(log, "We do not yet process this format of data, will insert default value"); - insertDefaultValue(buffer, column_idx); + insertDefaultValue(storage_data, column_idx); break; } default: @@ -272,7 +315,7 @@ void MaterializedPostgreSQLConsumer::readTupleData( LOG_WARNING(log, "Unexpected identifier: {}. This is a bug! Please report an issue on github", identifier); chassert(false); - insertDefaultValue(buffer, column_idx); + insertDefaultValue(storage_data, column_idx); break; } } @@ -291,7 +334,7 @@ void MaterializedPostgreSQLConsumer::readTupleData( "Got error while receiving value for column {}, will insert default value. Error: {}", column_idx, getCurrentExceptionMessage(true)); - insertDefaultValue(buffer, column_idx); + insertDefaultValue(storage_data, column_idx); /// Let's collect only the first exception. /// This delaying of error throw is needed because /// some errors can be ignored and just logged, @@ -301,19 +344,20 @@ void MaterializedPostgreSQLConsumer::readTupleData( } } + auto & columns = storage_data.getBuffer().columns; switch (type) { case PostgreSQLQuery::INSERT: { - buffer.columns[num_columns]->insert(static_cast(1)); - buffer.columns[num_columns + 1]->insert(lsn_value); + columns[num_columns]->insert(static_cast(1)); + columns[num_columns + 1]->insert(lsn_value); break; } case PostgreSQLQuery::DELETE: { - buffer.columns[num_columns]->insert(static_cast(-1)); - buffer.columns[num_columns + 1]->insert(lsn_value); + columns[num_columns]->insert(static_cast(-1)); + columns[num_columns + 1]->insert(lsn_value); break; } @@ -321,11 +365,11 @@ void MaterializedPostgreSQLConsumer::readTupleData( { /// Process old value in case changed value is a primary key. if (old_value) - buffer.columns[num_columns]->insert(static_cast(-1)); + columns[num_columns]->insert(static_cast(-1)); else - buffer.columns[num_columns]->insert(static_cast(1)); + columns[num_columns]->insert(static_cast(1)); - buffer.columns[num_columns + 1]->insert(lsn_value); + columns[num_columns + 1]->insert(lsn_value); break; } @@ -335,7 +379,6 @@ void MaterializedPostgreSQLConsumer::readTupleData( std::rethrow_exception(error); } - /// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * replication_message, size_t size) { @@ -366,10 +409,10 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl return; Int8 new_tuple = readInt8(replication_message, pos, size); - auto & buffer = storages.find(table_name)->second.buffer; + auto & storage_data = storages.find(table_name)->second; if (new_tuple) - readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::INSERT); + readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::INSERT); break; } @@ -386,7 +429,7 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl if (!isSyncAllowed(relation_id, table_name)) return; - auto & buffer = storages.find(table_name)->second.buffer; + auto & storage_data = storages.find(table_name)->second; auto proccess_identifier = [&](Int8 identifier) -> bool { @@ -401,13 +444,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl /// it is much more efficient to use replica identity index, but support all possible cases. case 'O': { - readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE, true); + readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::UPDATE, true); break; } case 'N': { /// New row. - readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::UPDATE); + readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::UPDATE); read_next = false; break; } @@ -441,8 +484,8 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl /// 0 or 1 if replica identity is set to full. For now only default replica identity is supported (with primary keys). readInt8(replication_message, pos, size); - auto & buffer = storages.find(table_name)->second.buffer; - readTupleData(buffer, replication_message, pos, size, PostgreSQLQuery::DELETE); + auto & storage_data = storages.find(table_name)->second; + readTupleData(storage_data, replication_message, pos, size, PostgreSQLQuery::DELETE); break; } case 'C': // Commit @@ -490,8 +533,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl return; } - auto & buffer = storage_iter->second.buffer; - /// 'd' - default (primary key if any) /// 'n' - nothing /// 'f' - all columns (set replica identity full) @@ -509,29 +550,13 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl Int16 num_columns = readInt16(replication_message, pos, size); - if (static_cast(num_columns) + 2 != buffer.getColumnsNum()) /// +2 -- sign and version columns - { - markTableAsSkipped(relation_id, table_name); - return; - } - - if (static_cast(num_columns) != buffer.attributes.size()) - { -#ifndef NDEBUG - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Mismatch in attributes size. Got {}, expected {}. It's a bug. Current buffer structure: {}", - num_columns, buffer.attributes.size(), buffer.description.sample_block.dumpStructure()); -#else - LOG_ERROR(log, "Mismatch in attributes size. Got {}, expected {}. It's a bug. Current buffer structure: {}", - num_columns, buffer.attributes.size(), buffer.description.sample_block.dumpStructure()); - markTableAsSkipped(relation_id, table_name); - return; -#endif - } - Int32 data_type_id; Int32 type_modifier; /// For example, n in varchar(n) + auto & storage_data = storage_iter->second; + const auto & description = storage_data.table_description; + + ColumnsWithTypeAndName columns; for (uint16_t i = 0; i < num_columns; ++i) { String column_name; @@ -541,13 +566,22 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl data_type_id = readInt32(replication_message, pos, size); type_modifier = readInt32(replication_message, pos, size); - if (buffer.attributes[i].atttypid != data_type_id || buffer.attributes[i].atttypmod != type_modifier) + columns.push_back(description.sample_block.getByName(column_name)); + + const auto & attributes_it = storage_data.columns_attributes.find(column_name); + if (attributes_it == storage_data.columns_attributes.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column: {}", column_name); + + const auto & attributes = attributes_it->second; + if (attributes.atttypid != data_type_id || attributes.atttypmod != type_modifier) { + LOG_TEST(log, "Column {} has a different type", column_name); markTableAsSkipped(relation_id, table_name); return; } } + storage_data.setBuffer(std::make_unique(std::move(columns), description)); tables_to_sync.insert(table_name); break; } @@ -563,7 +597,6 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl } } - void MaterializedPostgreSQLConsumer::syncTables() { size_t synced_tables = 0; @@ -571,8 +604,8 @@ void MaterializedPostgreSQLConsumer::syncTables() { auto table_name = *tables_to_sync.begin(); auto & storage_data = storages.find(table_name)->second; - Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns)); - storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns(); + auto & buffer = storage_data.getBuffer(); + Block result_rows = buffer.sample_block.cloneWithColumns(std::move(buffer.columns)); try { @@ -585,7 +618,7 @@ void MaterializedPostgreSQLConsumer::syncTables() auto insert = std::make_shared(); insert->table_id = storage->getStorageID(); - insert->columns = storage_data.buffer.columns_ast; + insert->columns = std::make_shared(buffer.columns_ast); InterpreterInsertQuery interpreter(insert, insert_context, true); auto io = interpreter.execute(); @@ -603,10 +636,11 @@ void MaterializedPostgreSQLConsumer::syncTables() catch (...) { /// Retry this buffer later. - storage_data.buffer.columns = result_rows.mutateColumns(); + buffer.columns = result_rows.mutateColumns(); throw; } + storage_data.setBuffer(nullptr); tables_to_sync.erase(tables_to_sync.begin()); } @@ -616,7 +650,6 @@ void MaterializedPostgreSQLConsumer::syncTables() updateLsn(); } - void MaterializedPostgreSQLConsumer::updateLsn() { try @@ -632,7 +665,6 @@ void MaterializedPostgreSQLConsumer::updateLsn() } } - String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr tx) { std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn); @@ -644,7 +676,6 @@ String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr; + + const StoragePtr storage; + const ExternalResultDescription table_description; + const PostgreSQLTableStructure::Attributes columns_attributes; + const ArrayInfo array_info; + struct Buffer { - ExternalResultDescription description; + Block sample_block; MutableColumns columns; + ASTExpressionList columns_ast; - /// Needed to pass to insert query columns list in syncTables(). - std::shared_ptr columns_ast; - /// Needed for insertPostgreSQLValue() method to parse array - std::unordered_map array_info; - /// To validate ddl. - PostgreSQLTableStructure::Attributes attributes; + explicit Buffer(ColumnsWithTypeAndName && columns_, const ExternalResultDescription & table_description_); - Buffer(StorageMetadataPtr storage_metadata, const PostgreSQLTableStructure::Attributes & attributes_); - - size_t getColumnsNum() const - { - const auto & sample_block = description.sample_block; - return sample_block.columns(); - } + void assertInsertIsPossible(size_t col_idx) const; }; - StoragePtr storage; - Buffer buffer; + Buffer & getBuffer(); - explicit StorageData(const StorageInfo & storage_info); - StorageData(const StorageData & other) = delete; + void setBuffer(std::unique_ptr buffer_) { buffer = std::move(buffer_); } + + private: + std::unique_ptr buffer; }; using Storages = std::unordered_map; @@ -97,8 +101,8 @@ private: bool isSyncAllowed(Int32 relation_id, const String & relation_name); - static void insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx); - void insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx); + static void insertDefaultValue(StorageData & storage_data, size_t column_idx); + void insertValue(StorageData & storage_data, const std::string & value, size_t column_idx); enum class PostgreSQLQuery { @@ -107,7 +111,7 @@ private: DELETE }; - void readTupleData(StorageData::Buffer & buffer, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false); + void readTupleData(StorageData & storage_data, const char * message, size_t & pos, size_t size, PostgreSQLQuery type, bool old_value = false); template static T unhexN(const char * message, size_t pos, size_t n); @@ -119,8 +123,6 @@ private: void markTableAsSkipped(Int32 relation_id, const String & relation_name); - static void assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx); - /// lsn - log sequence number, like wal offset (64 bit). static Int64 getLSNValue(const std::string & lsn) { diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index ee38dcb44d4..7a73bdf153b 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -337,6 +337,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) dropReplicationSlot(tx); initial_sync(); + LOG_DEBUG(log, "Loaded {} tables", nested_storages.size()); } /// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's /// and pass them to replication consumer. @@ -414,16 +415,18 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection std::string query_str = fmt::format("SET TRANSACTION SNAPSHOT '{}'", snapshot_name); tx->exec(query_str); + auto table_structure = fetchTableStructure(*tx, table_name); + if (!table_structure->physical_columns) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes"); + + auto table_attributes = table_structure->physical_columns->attributes; + /// Load from snapshot, which will show table state before creation of replication slot. /// Already connected to needed database, no need to add it to query. auto quoted_name = doubleQuoteWithSchema(table_name); query_str = fmt::format("SELECT * FROM ONLY {}", quoted_name); - LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name); - auto table_structure = fetchTableStructure(*tx, table_name); - if (!table_structure->physical_columns) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No table attributes"); - auto table_attributes = table_structure->physical_columns->attributes; + LOG_DEBUG(log, "Loading PostgreSQL table {}.{}", postgres_database, quoted_name); auto table_override = tryGetTableOverride(current_database_name, table_name); materialized_storage->createNestedIfNeeded(std::move(table_structure), table_override ? table_override->as() : nullptr); @@ -444,12 +447,16 @@ StorageInfo PostgreSQLReplicationHandler::loadFromSnapshot(postgres::Connection assertBlocksHaveEqualStructure(input->getPort().getHeader(), block_io.pipeline.getHeader(), "postgresql replica load from snapshot"); block_io.pipeline.complete(Pipe(std::move(input))); + /// TODO: make a test when we fail in the middle of inserting data from source. + CompletedPipelineExecutor executor(block_io.pipeline); executor.execute(); materialized_storage->set(nested_storage); auto nested_table_id = nested_storage->getStorageID(); - LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})", nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid)); + + LOG_DEBUG(log, "Loaded table {}.{} (uuid: {})", + nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid)); return StorageInfo(nested_storage, std::move(table_attributes)); } diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp index d83722dba6c..c753a41be40 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include #include @@ -195,7 +197,8 @@ void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructur const auto ast_create = getCreateNestedTableQuery(std::move(table_structure), table_override); auto table_id = getStorageID(); auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName()); - LOG_DEBUG(log, "Creating clickhouse table for postgresql table {}", table_id.getNameForLogs()); + LOG_DEBUG(log, "Creating clickhouse table for postgresql table {} (ast: {})", + table_id.getNameForLogs(), serializeAST(*ast_create)); InterpreterCreateQuery interpreter(ast_create, nested_context); interpreter.execute(); @@ -359,7 +362,8 @@ ASTPtr StorageMaterializedPostgreSQL::getColumnDeclaration(const DataTypePtr & d } -std::shared_ptr StorageMaterializedPostgreSQL::getColumnsExpressionList(const NamesAndTypesList & columns) const +std::shared_ptr +StorageMaterializedPostgreSQL::getColumnsExpressionList(const NamesAndTypesList & columns, std::unordered_map defaults) const { auto columns_expression_list = std::make_shared(); for (const auto & [name, type] : columns) @@ -369,6 +373,12 @@ std::shared_ptr StorageMaterializedPostgreSQL::getColumnsExpr column_declaration->name = name; column_declaration->type = getColumnDeclaration(type); + if (auto it = defaults.find(name); it != defaults.end()) + { + column_declaration->default_expression = it->second; + column_declaration->default_specifier = "DEFAULT"; + } + columns_expression_list->children.emplace_back(column_declaration); } return columns_expression_list; @@ -460,8 +470,28 @@ ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery( } else { - ordinary_columns_and_types = table_structure->physical_columns->columns; - columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types)); + const auto columns = table_structure->physical_columns; + std::unordered_map defaults; + for (const auto & col : columns->columns) + { + const auto & attr = columns->attributes.at(col.name); + if (!attr.attr_def.empty()) + { + ParserExpression expr_parser; + Expected expected; + ASTPtr result; + + Tokens tokens(attr.attr_def.data(), attr.attr_def.data() + attr.attr_def.size()); + IParser::Pos pos(tokens, DBMS_DEFAULT_MAX_PARSER_DEPTH); + if (!expr_parser.parse(pos, result, expected)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to parse default expression: {}", attr.attr_def); + } + defaults.emplace(col.name, result); + } + } + ordinary_columns_and_types = columns->columns; + columns_declare_list->set(columns_declare_list->columns, getColumnsExpressionList(ordinary_columns_and_types, defaults)); } if (ordinary_columns_and_types.empty()) diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h index af0adb10f9f..9c3c195e34f 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h @@ -109,7 +109,8 @@ public: ASTPtr getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure, const ASTTableOverride * table_override); - std::shared_ptr getColumnsExpressionList(const NamesAndTypesList & columns) const; + std::shared_ptr getColumnsExpressionList( + const NamesAndTypesList & columns, std::unordered_map defaults = {}) const; StoragePtr getNested() const; diff --git a/tests/integration/test_postgresql_replica_database_engine_2/test.py b/tests/integration/test_postgresql_replica_database_engine_2/test.py index e8053730c44..2a72c3591e9 100644 --- a/tests/integration/test_postgresql_replica_database_engine_2/test.py +++ b/tests/integration/test_postgresql_replica_database_engine_2/test.py @@ -810,6 +810,150 @@ def test_replica_consumer(started_cluster): pg_manager_instance2.clear() +def test_replica_consumer(started_cluster): + table = "test_replica_consumer" + pg_manager_instance2.restart() + + pg_manager.create_postgres_table(table) + instance.query( + f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)" + ) + + for pm in [pg_manager, pg_manager_instance2]: + pm.create_materialized_db( + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=[ + f"materialized_postgresql_tables_list = '{table}'", + "materialized_postgresql_backoff_min_ms = 100", + "materialized_postgresql_backoff_max_ms = 100", + "materialized_postgresql_use_unique_replication_consumer_identifier = 1", + ], + ) + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + check_tables_are_synchronized( + instance2, table, postgres_database=pg_manager_instance2.get_default_database() + ) + + assert 50 == int(instance.query(f"SELECT count() FROM test_database.{table}")) + assert 50 == int(instance2.query(f"SELECT count() FROM test_database.{table}")) + + instance.query( + f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(1000, 1000)" + ) + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + check_tables_are_synchronized( + instance2, table, postgres_database=pg_manager_instance2.get_default_database() + ) + + assert 1050 == int(instance.query(f"SELECT count() FROM test_database.{table}")) + assert 1050 == int(instance2.query(f"SELECT count() FROM test_database.{table}")) + + for pm in [pg_manager, pg_manager_instance2]: + pm.drop_materialized_db() + pg_manager_instance2.clear() + + +def test_generated_columns(started_cluster): + table = "test_generated_columns" + + pg_manager.create_postgres_table( + table, + "", + f"""CREATE TABLE {table} ( + key integer PRIMARY KEY, + x integer, + y integer GENERATED ALWAYS AS (x*2) STORED, + z text); + """, + ) + + pg_manager.execute(f"insert into {table} (key, x, z) values (1,1,'1');") + pg_manager.execute(f"insert into {table} (key, x, z) values (2,2,'2');") + + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=[ + f"materialized_postgresql_tables_list = '{table}'", + "materialized_postgresql_backoff_min_ms = 100", + "materialized_postgresql_backoff_max_ms = 100", + ], + ) + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + + pg_manager.execute(f"insert into {table} (key, x, z) values (3,3,'3');") + pg_manager.execute(f"insert into {table} (key, x, z) values (4,4,'4');") + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + + pg_manager.execute(f"insert into {table} (key, x, z) values (5,5,'5');") + pg_manager.execute(f"insert into {table} (key, x, z) values (6,6,'6');") + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + + +def test_default_columns(started_cluster): + table = "test_default_columns" + + pg_manager.create_postgres_table( + table, + "", + f"""CREATE TABLE {table} ( + key integer PRIMARY KEY, + x integer, + y text DEFAULT 'y1', + z integer, + a text DEFAULT 'a1', + b integer); + """, + ) + + pg_manager.execute(f"insert into {table} (key, x, z, b) values (1,1,1,1);") + pg_manager.execute(f"insert into {table} (key, x, z, b) values (2,2,2,2);") + + pg_manager.create_materialized_db( + ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=[ + f"materialized_postgresql_tables_list = '{table}'", + "materialized_postgresql_backoff_min_ms = 100", + "materialized_postgresql_backoff_max_ms = 100", + ], + ) + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + + pg_manager.execute(f"insert into {table} (key, x, z, b) values (3,3,3,3);") + pg_manager.execute(f"insert into {table} (key, x, z, b) values (4,4,4,4);") + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + + pg_manager.execute(f"insert into {table} (key, x, z, b) values (5,5,5,5);") + pg_manager.execute(f"insert into {table} (key, x, z, b) values (6,6,6,6);") + + check_tables_are_synchronized( + instance, table, postgres_database=pg_manager.get_default_database() + ) + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...")