diff --git a/src/DataStreams/PostgreSQLBlockInputStream.cpp b/src/DataStreams/PostgreSQLBlockInputStream.cpp index 4646a8a9d32..a52ca1e58a4 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.cpp +++ b/src/DataStreams/PostgreSQLBlockInputStream.cpp @@ -22,11 +22,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int BAD_ARGUMENTS; -} - PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( std::unique_ptr tx_, const std::string & query_str_, @@ -38,8 +33,8 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream( { description.init(sample_block); for (const auto idx : ext::range(0, description.sample_block.columns())) - if (description.types[idx].first == ValueType::vtArray) - prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type); + if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) + preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type); /// pqxx::stream_from uses COPY command, will get error if ';' is present if (query_str.ends_with(';')) query_str.resize(query_str.size() - 1); @@ -80,12 +75,17 @@ Block PostgreSQLBlockInputStream::readImpl() { ColumnNullable & column_nullable = assert_cast(*columns[idx]); const auto & data_type = assert_cast(*sample.type); - insertValue(column_nullable.getNestedColumn(), (*row)[idx], description.types[idx].first, data_type.getNestedType(), idx); + + insertPostgreSQLValue( + column_nullable.getNestedColumn(), (*row)[idx], + description.types[idx].first, data_type.getNestedType(), array_info, idx); + column_nullable.getNullMapData().emplace_back(0); } else { - insertValue(*columns[idx], (*row)[idx], description.types[idx].first, sample.type, idx); + insertPostgreSQLValue( + *columns[idx], (*row)[idx], description.types[idx].first, sample.type, array_info, idx); } } else @@ -113,183 +113,6 @@ void PostgreSQLBlockInputStream::readSuffix() } -void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view value, - const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx) -{ - switch (type) - { - case ValueType::vtUInt8: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtUInt16: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtUInt32: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtUInt64: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtInt8: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtInt16: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtInt32: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtInt64: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtFloat32: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtFloat64: - assert_cast(column).insertValue(pqxx::from_string(value)); - break; - case ValueType::vtFixedString:[[fallthrough]]; - case ValueType::vtString: - assert_cast(column).insertData(value.data(), value.size()); - break; - case ValueType::vtUUID: - assert_cast(column).insert(parse(value.data(), value.size())); - break; - case ValueType::vtDate: - assert_cast(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()}); - break; - case ValueType::vtDateTime: - assert_cast(column).insertValue(time_t{LocalDateTime{std::string(value)}}); - break; - case ValueType::vtDateTime64:[[fallthrough]]; - case ValueType::vtDecimal32: [[fallthrough]]; - case ValueType::vtDecimal64: [[fallthrough]]; - case ValueType::vtDecimal128: [[fallthrough]]; - case ValueType::vtDecimal256: - { - ReadBufferFromString istr(value); - data_type->deserializeAsWholeText(column, istr, FormatSettings{}); - break; - } - case ValueType::vtArray: - { - pqxx::array_parser parser{value}; - std::pair parsed = parser.get_next(); - - size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions; - const auto parse_value = array_info[idx].pqxx_parser; - std::vector> dimensions(expected_dimensions + 1); - - while (parsed.first != pqxx::array_parser::juncture::done) - { - if ((parsed.first == pqxx::array_parser::juncture::row_start) && (++dimension > expected_dimensions)) - throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS); - - else if (parsed.first == pqxx::array_parser::juncture::string_value) - dimensions[dimension].emplace_back(parse_value(parsed.second)); - - else if (parsed.first == pqxx::array_parser::juncture::null_value) - dimensions[dimension].emplace_back(array_info[idx].default_value); - - else if (parsed.first == pqxx::array_parser::juncture::row_end) - { - max_dimension = std::max(max_dimension, dimension); - - if (--dimension == 0) - break; - - dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end())); - dimensions[dimension + 1].clear(); - } - - parsed = parser.get_next(); - } - - if (max_dimension < expected_dimensions) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions); - - assert_cast(column).insert(Array(dimensions[1].begin(), dimensions[1].end())); - break; - } - } -} - - -void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataTypePtr data_type) -{ - const auto * array_type = typeid_cast(data_type.get()); - auto nested = array_type->getNestedType(); - - size_t count_dimensions = 1; - while (isArray(nested)) - { - ++count_dimensions; - nested = typeid_cast(nested.get())->getNestedType(); - } - - Field default_value = nested->getDefault(); - if (nested->isNullable()) - nested = static_cast(nested.get())->getNestedType(); - - WhichDataType which(nested); - std::function parser; - - if (which.isUInt8() || which.isUInt16()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isInt8() || which.isInt16()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isUInt32()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isInt32()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isUInt64()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isInt64()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isFloat32()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isFloat64()) - parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; - else if (which.isString() || which.isFixedString()) - parser = [](std::string & field) -> Field { return field; }; - else if (which.isDate()) - parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; }; - else if (which.isDateTime()) - parser = [](std::string & field) -> Field { return time_t{LocalDateTime{field}}; }; - else if (which.isDecimal32()) - parser = [nested](std::string & field) -> Field - { - const auto & type = typeid_cast *>(nested.get()); - DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); - return convertFieldToType(field, res); - }; - else if (which.isDecimal64()) - parser = [nested](std::string & field) -> Field - { - const auto & type = typeid_cast *>(nested.get()); - DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); - return convertFieldToType(field, res); - }; - else if (which.isDecimal128()) - parser = [nested](std::string & field) -> Field - { - const auto & type = typeid_cast *>(nested.get()); - DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); - return convertFieldToType(field, res); - }; - else if (which.isDecimal256()) - parser = [nested](std::string & field) -> Field - { - const auto & type = typeid_cast *>(nested.get()); - DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); - return convertFieldToType(field, res); - }; - else - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName()); - - array_info[column_idx] = {count_dimensions, default_value, parser}; -} } diff --git a/src/DataStreams/PostgreSQLBlockInputStream.h b/src/DataStreams/PostgreSQLBlockInputStream.h index 1e52b48c7cf..c18ccd0f55e 100644 --- a/src/DataStreams/PostgreSQLBlockInputStream.h +++ b/src/DataStreams/PostgreSQLBlockInputStream.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB @@ -29,19 +30,14 @@ public: Block getHeader() const override { return description.sample_block.cloneEmpty(); } private: - using ValueType = ExternalResultDescription::ValueType; - void readPrefix() override; Block readImpl() override; void readSuffix() override; - void insertValue(IColumn & column, std::string_view value, - const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx); void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); } - void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type); String query_str; const UInt64 max_block_size; @@ -51,13 +47,7 @@ private: std::unique_ptr tx; std::unique_ptr stream; - struct ArrayInfo - { - size_t num_dimensions; - Field default_value; - std::function pqxx_parser; - }; - std::unordered_map array_info; + std::unordered_map array_info; }; } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaBlockInputStream.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaBlockInputStream.cpp deleted file mode 100644 index 04ee68eb3aa..00000000000 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaBlockInputStream.cpp +++ /dev/null @@ -1,149 +0,0 @@ -#include "PostgreSQLReplicaBlockInputStream.h" - -#include -#include -#include - - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -namespace DB -{ - -PostgreSQLReplicaBlockInputStream::PostgreSQLReplicaBlockInputStream( - StoragePostgreSQLReplica & storage_, - ConsumerBufferPtr buffer_, - const StorageMetadataPtr & metadata_snapshot_, - std::shared_ptr context_, - const Names & columns, - size_t max_block_size_) - : storage(storage_) - , buffer(buffer_) - , metadata_snapshot(metadata_snapshot_) - , context(context_) - , column_names(columns) - , max_block_size(max_block_size_) - , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) - , sample_block(non_virtual_header) - , virtual_header(metadata_snapshot->getSampleBlockForColumns({}, storage.getVirtuals(), storage.getStorageID())) -{ - for (const auto & column : virtual_header) - sample_block.insert(column); -} - - -PostgreSQLReplicaBlockInputStream::~PostgreSQLReplicaBlockInputStream() -{ -} - - -void PostgreSQLReplicaBlockInputStream::readPrefixImpl() -{ -} - - -Block PostgreSQLReplicaBlockInputStream::readImpl() -{ - if (!buffer || finished) - return Block(); - - finished = true; - - MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); - MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - - auto input_format = FormatFactory::instance().getInputFormat( - "Values", *buffer, non_virtual_header, *context, max_block_size); - - InputPort port(input_format->getPort().getHeader(), input_format.get()); - connect(input_format->getPort(), port); - port.setNeeded(); - - auto read_rabbitmq_message = [&] - { - size_t new_rows = 0; - - while (true) - { - auto status = input_format->prepare(); - - switch (status) - { - case IProcessor::Status::Ready: - input_format->work(); - break; - - case IProcessor::Status::Finished: - input_format->resetParser(); - return new_rows; - - case IProcessor::Status::PortFull: - { - auto chunk = port.pull(); - - auto chunk_rows = chunk.getNumRows(); - new_rows += chunk_rows; - - auto columns = chunk.detachColumns(); - - for (size_t i = 0, s = columns.size(); i < s; ++i) - { - result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size()); - } - break; - } - case IProcessor::Status::NeedData: - case IProcessor::Status::Async: - case IProcessor::Status::ExpandPipeline: - throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR); - } - } - }; - - size_t total_rows = 0; - - while (true) - { - if (buffer->eof()) - break; - - auto new_rows = read_rabbitmq_message(); - - if (new_rows) - { - //auto timestamp = buffer->getTimestamp(); - //for (size_t i = 0; i < new_rows; ++i) - //{ - // virtual_columns[0]->insert(timestamp); - //} - - total_rows = total_rows + new_rows; - } - - buffer->allowNext(); - - if (total_rows >= max_block_size || !checkTimeLimit()) - break; - } - - if (total_rows == 0) - return Block(); - - auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns)); - auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); - - for (const auto & column : virtual_block.getColumnsWithTypeAndName()) - result_block.insert(column); - - return result_block; -} - - -void PostgreSQLReplicaBlockInputStream::readSuffixImpl() -{ -} - -} diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaBlockInputStream.h b/src/Storages/PostgreSQL/PostgreSQLReplicaBlockInputStream.h deleted file mode 100644 index 995c640682a..00000000000 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaBlockInputStream.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include -#include "StoragePostgreSQLReplica.h" -#include "PostgreSQLReplicaConsumerBuffer.h" -#include "buffer_fwd.h" - - -namespace DB -{ - -class PostgreSQLReplicaBlockInputStream : public IBlockInputStream -{ - -public: - PostgreSQLReplicaBlockInputStream( - StoragePostgreSQLReplica & storage_, - ConsumerBufferPtr buffer_, - const StorageMetadataPtr & metadata_snapshot_, - std::shared_ptr context_, - const Names & columns, - size_t max_block_size_); - - ~PostgreSQLReplicaBlockInputStream() override; - - String getName() const override { return storage.getName(); } - Block getHeader() const override { return sample_block; } - - void readPrefixImpl() override; - Block readImpl() override; - void readSuffixImpl() override; - -private: - StoragePostgreSQLReplica & storage; - ConsumerBufferPtr buffer; - StorageMetadataPtr metadata_snapshot; - std::shared_ptr context; - Names column_names; - const size_t max_block_size; - - bool finished = false; - const Block non_virtual_header; - Block sample_block; - const Block virtual_header; -}; - -} diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp index 076863eb8dd..c38b898fdc1 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.cpp @@ -2,13 +2,30 @@ #include #include + #include + #include #include #include + #include #include + #include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include namespace DB { @@ -17,9 +34,9 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -static const auto wal_reader_reschedule_ms = 500; +static const auto reschedule_ms = 500; static const auto max_thread_work_duration_ms = 60000; -static const auto max_empty_slot_reads = 20; +static const auto max_empty_slot_reads = 2; PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( std::shared_ptr context_, @@ -27,7 +44,9 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( const std::string & conn_str, const std::string & replication_slot_name_, const std::string & publication_name_, - const LSNPosition & start_lsn) + const LSNPosition & start_lsn, + const size_t max_block_size_, + StoragePtr nested_storage_) : log(&Poco::Logger::get("PostgreSQLReaplicaConsumer")) , context(context_) , replication_slot_name(replication_slot_name_) @@ -35,40 +54,49 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer( , table_name(table_name_) , connection(std::make_shared(conn_str)) , current_lsn(start_lsn) + , max_block_size(max_block_size_) + , nested_storage(nested_storage_) + , sample_block(nested_storage->getInMemoryMetadata().getSampleBlock()) { replication_connection = std::make_shared(fmt::format("{} replication=database", conn_str)); - wal_reader_task = context->getSchedulePool().createTask("PostgreSQLReplicaWALReader", [this]{ WALReaderFunc(); }); + description.init(sample_block); + for (const auto idx : ext::range(0, description.sample_block.columns())) + if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray) + preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type); + + columns = description.sample_block.cloneEmptyColumns(); + + wal_reader_task = context->getSchedulePool().createTask("PostgreSQLReplicaWALReader", [this]{ replicationStream(); }); wal_reader_task->deactivate(); } void PostgreSQLReplicaConsumer::startSynchronization() { - //wal_reader_task->activateAndSchedule(); + wal_reader_task->activateAndSchedule(); } void PostgreSQLReplicaConsumer::stopSynchronization() { stop_synchronization.store(true); - if (wal_reader_task) - wal_reader_task->deactivate(); + wal_reader_task->deactivate(); } -void PostgreSQLReplicaConsumer::WALReaderFunc() +void PostgreSQLReplicaConsumer::replicationStream() { size_t count_empty_slot_reads = 0; auto start_time = std::chrono::steady_clock::now(); - LOG_TRACE(log, "Starting synchronization thread"); + LOG_TRACE(log, "Starting replication stream"); while (!stop_synchronization) { if (!readFromReplicationSlot() && ++count_empty_slot_reads == max_empty_slot_reads) { - LOG_TRACE(log, "Reschedule synchronization. Replication slot is empty."); + LOG_TRACE(log, "Reschedule replication stream. Replication slot is empty."); break; } else @@ -78,13 +106,38 @@ void PostgreSQLReplicaConsumer::WALReaderFunc() auto duration = std::chrono::duration_cast(end_time - start_time); if (duration.count() > max_thread_work_duration_ms) { - LOG_TRACE(log, "Reschedule synchronization. Thread work duration limit exceeded."); + LOG_TRACE(log, "Reschedule replication_stream. Thread work duration limit exceeded."); break; } } if (!stop_synchronization) - wal_reader_task->scheduleAfter(wal_reader_reschedule_ms); + wal_reader_task->scheduleAfter(reschedule_ms); +} + + +void PostgreSQLReplicaConsumer::insertValue(std::string & value, size_t column_idx) +{ + const auto & sample = description.sample_block.getByPosition(column_idx); + bool is_nullable = description.types[column_idx].second; + + LOG_TRACE(log, "INSERTING VALUE {}", value); + if (is_nullable) + { + ColumnNullable & column_nullable = assert_cast(*columns[column_idx]); + const auto & data_type = assert_cast(*sample.type); + + insertPostgreSQLValue( + column_nullable.getNestedColumn(), value, + description.types[column_idx].first, data_type.getNestedType(), array_info, column_idx); + + column_nullable.getNullMapData().emplace_back(0); + } + else + { + insertPostgreSQLValue( + *columns[column_idx], value, description.types[column_idx].first, sample.type, array_info, column_idx); + } } @@ -150,17 +203,24 @@ void PostgreSQLReplicaConsumer::readTupleData(const char * message, size_t & pos Int16 num_columns = readInt16(message, pos); /// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data LOG_DEBUG(log, "num_columns {}", num_columns); - for (int k = 0; k < num_columns; ++k) + for (int column_idx = 0; column_idx < num_columns; ++column_idx) { char identifier = readInt8(message, pos); Int32 col_len = readInt32(message, pos); - String result; + String value; for (int i = 0; i < col_len; ++i) { - result += readInt8(message, pos); + value += readInt8(message, pos); } - LOG_DEBUG(log, "identifier {}, col_len {}, result {}", identifier, col_len, result); + + insertValue(value, column_idx); + + LOG_DEBUG(log, "identifier {}, col_len {}, value {}", identifier, col_len, value); } + + String val = "1"; + insertValue(val, num_columns); + insertValue(val, num_columns + 1); //readString(message, pos, size, result); } @@ -171,6 +231,7 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio size_t pos = 2; char type = readInt8(replication_message, pos); + LOG_TRACE(log, "TYPE: {}", type); switch (type) { @@ -180,6 +241,7 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio Int64 transaction_commit_timestamp = readInt64(replication_message, pos); LOG_DEBUG(log, "transaction lsn {}, transaction commit timespamp {}", transaction_end_lsn, transaction_commit_timestamp); + //current_lsn.lsn_value = transaction_end_lsn; break; } case 'C': // Commit @@ -191,6 +253,7 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio Int64 transaction_commit_timestamp = readInt64(replication_message, pos); LOG_DEBUG(log, "commit lsn {}, transaction lsn {}, transaction commit timestamp {}", commit_lsn, transaction_end_lsn, transaction_commit_timestamp); + final_lsn.lsn = current_lsn.lsn; break; } case 'O': // Origin @@ -245,16 +308,49 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio } +void PostgreSQLReplicaConsumer::syncIntoTable(Block & block) +{ + Context insert_context(*context); + insert_context.makeQueryContext(); + + auto insert = std::make_shared(); + insert->table_id = nested_storage->getStorageID(); + + InterpreterInsertQuery interpreter(insert, insert_context); + auto block_io = interpreter.execute(); + OneBlockInputStream input(block); + + copyData(input, *block_io.out); + LOG_TRACE(log, "TABLE SYNC END"); +} + + +void PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr ntx) +{ + LOG_TRACE(log, "CURRENT LSN FROM TO {}", final_lsn.lsn); + std::string query_str = fmt::format("SELECT pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn.lsn); + pqxx::result result{ntx->exec(query_str)}; + if (!result.empty()) + { + std::string s1 = result[0].size() > 0 && !result[0][0].is_null() ? result[0][0].as() : "NULL"; + std::string s2 = result[0].size() > 1 && !result[0][1].is_null() ? result[0][1].as() : "NULL"; + LOG_TRACE(log, "ADVANCE LSN: {} and {}", s1, s2); + + } +} + + /// Read binary changes from replication slot via copy command. bool PostgreSQLReplicaConsumer::readFromReplicationSlot() { + columns = description.sample_block.cloneEmptyColumns(); bool slot_empty = true; try { - auto tx = std::make_unique(*replication_connection->conn()); + auto tx = std::make_shared(*replication_connection->conn()); /// up_to_lsn is set to NULL, up_to_n_changes is set to max_block_size. std::string query_str = fmt::format( - "select data FROM pg_logical_slot_peek_binary_changes(" + "select lsn, data FROM pg_logical_slot_peek_binary_changes(" "'{}', NULL, NULL, 'publication_names', '{}', 'proto_version', '1')", replication_slot_name, publication_name); pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query_str)); @@ -267,17 +363,23 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot() { LOG_TRACE(log, "STREAM REPLICATION END"); stream.complete(); + + Block result_rows = description.sample_block.cloneWithColumns(std::move(columns)); + if (result_rows.rows()) + { + syncIntoTable(result_rows); + advanceLSN(tx); + } + tx->commit(); break; } slot_empty = false; - for (const auto idx : ext::range(0, row->size())) - { - LOG_TRACE(log, "Replication message: {}", (*row)[idx]); - decodeReplicationMessage((*row)[idx].c_str(), (*row)[idx].size()); - } + current_lsn.lsn = (*row)[0]; + LOG_TRACE(log, "Replication message: {}", (*row)[1]); + decodeReplicationMessage((*row)[1].c_str(), (*row)[1].size()); } } catch (...) diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h index bb4b4c5033b..ca357236180 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumer.h @@ -3,7 +3,10 @@ #include "PostgreSQLConnection.h" #include #include +#include +#include #include "pqxx/pqxx" +#include namespace DB { @@ -11,8 +14,9 @@ namespace DB struct LSNPosition { std::string lsn; + int64_t lsn_value; - uint64_t getValue() + int64_t getValue() { uint64_t upper_half, lower_half, result; std::sscanf(lsn.data(), "%lX/%lX", &upper_half, &lower_half); @@ -22,6 +26,15 @@ struct LSNPosition // upper_half, lower_half, result); return result; } + + std::string getString() + { + char result[16]; + std::snprintf(result, sizeof(result), "%lX/%lX", (lsn_value >> 32), lsn_value & 0xFFFFFFFF); + //assert(lsn_value == result.getValue()); + std::string ans = result; + return ans; + } }; @@ -34,20 +47,28 @@ public: const std::string & conn_str_, const std::string & replication_slot_name_, const std::string & publication_name_, - const LSNPosition & start_lsn); + const LSNPosition & start_lsn, + const size_t max_block_size_, + StoragePtr nested_storage_); /// Start reading WAL from current_lsn position. Initial data sync from created snapshot already done. void startSynchronization(); void stopSynchronization(); private: - /// Executed by wal_reader_task. A separate thread reads wal and advances lsn when rows were written via copyData. - void WALReaderFunc(); + /// Executed by wal_reader_task. A separate thread reads wal and advances lsn to last commited position + /// after rows were written via copyData. + void replicationStream(); + void stopReplicationStream(); /// Start changes stream from WAL via copy command (up to max_block_size changes). bool readFromReplicationSlot(); void decodeReplicationMessage(const char * replication_message, size_t size); + void insertValue(std::string & value, size_t column_idx); + void syncIntoTable(Block & block); + void advanceLSN(std::shared_ptr ntx); + /// Methods to parse replication message data. void readTupleData(const char * message, size_t & pos, size_t size); void readString(const char * message, size_t & pos, size_t size, String & result); @@ -64,9 +85,18 @@ private: const std::string table_name; PostgreSQLConnectionPtr connection, replication_connection; - LSNPosition current_lsn; + LSNPosition current_lsn, final_lsn; BackgroundSchedulePool::TaskHolder wal_reader_task; + //BackgroundSchedulePool::TaskHolder table_sync_task; std::atomic stop_synchronization = false; + + const size_t max_block_size; + StoragePtr nested_storage; + Block sample_block; + ExternalResultDescription description; + MutableColumns columns; + /// Needed for insertPostgreSQLValue() method to parse array + std::unordered_map array_info; }; } diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumerBuffer.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumerBuffer.cpp deleted file mode 100644 index e8c4ba3d55a..00000000000 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumerBuffer.cpp +++ /dev/null @@ -1,38 +0,0 @@ -#include "PostgreSQLReplicaConsumerBuffer.h" - - -namespace DB -{ - -PostgreSQLReplicaConsumerBuffer::PostgreSQLReplicaConsumerBuffer( - uint64_t max_block_size_) - : ReadBuffer(nullptr, 0) - , rows_data(max_block_size_) -{ -} - - -PostgreSQLReplicaConsumerBuffer::~PostgreSQLReplicaConsumerBuffer() -{ - BufferBase::set(nullptr, 0, 0); -} - - -bool PostgreSQLReplicaConsumerBuffer::nextImpl() -{ - if (!allowed) - return false; - - if (rows_data.tryPop(current_row_data)) - { - auto * new_position = const_cast(current_row_data.data.data()); - BufferBase::set(new_position, current_row_data.data.size(), 0); - allowed = false; - - return true; - } - - return false; -} - -} diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumerBuffer.h b/src/Storages/PostgreSQL/PostgreSQLReplicaConsumerBuffer.h deleted file mode 100644 index 8c8de3a8b68..00000000000 --- a/src/Storages/PostgreSQL/PostgreSQLReplicaConsumerBuffer.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include "buffer_fwd.h" - - -namespace DB -{ - -class PostgreSQLReplicaConsumerBuffer : public ReadBuffer -{ - -public: - PostgreSQLReplicaConsumerBuffer( - uint64_t max_block_size_); - - ~PostgreSQLReplicaConsumerBuffer() override; - - void allowNext() { allowed = true; } - -private: - bool nextImpl() override; - - struct RowData - { - String data; - RowData() : data("") {} - }; - - RowData current_row_data; - ConcurrentBoundedQueue rows_data; - bool allowed = true; -}; - -} diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index 5a68ef65925..7b75c42c7a8 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -28,13 +28,15 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( const std::string & conn_str, std::shared_ptr context_, const std::string & publication_name_, - const std::string & replication_slot_name_) + const std::string & replication_slot_name_, + const size_t max_block_size_) : log(&Poco::Logger::get("PostgreSQLReplicaHandler")) , context(context_) , database_name(database_name_) , table_name(table_name_) , publication_name(publication_name_) , replication_slot(replication_slot_name_) + , max_block_size(max_block_size_) , connection(std::make_shared(conn_str)) { /// Create a replication connection, through which it is possible to execute only commands from streaming replication protocol @@ -56,7 +58,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( void PostgreSQLReplicationHandler::startup(StoragePtr storage) { - helper_table = storage; + nested_storage = storage; startup_task->activateAndSchedule(); } @@ -98,6 +100,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist() assert(!result.empty()); bool publication_exists = (result[0][0].as() == "t"); + /// TODO: check if publication is still valid? if (publication_exists) LOG_TRACE(log, "Publication {} already exists. Using existing version", publication_name); @@ -121,7 +124,7 @@ void PostgreSQLReplicationHandler::createPublication() /// TODO: check replica identity /// Requires changed replica identity for included table to be able to receive old values of updated rows. - /// (ALTER TABLE table_name REPLICA IDENTITY FULL) + /// (ALTER TABLE table_name REPLICA IDENTITY FULL ?) } @@ -173,7 +176,9 @@ void PostgreSQLReplicationHandler::startReplication() connection->conn_str(), replication_slot, publication_name, - start_lsn); + start_lsn, + max_block_size, + nested_storage); LOG_DEBUG(log, "Commiting replication transaction"); ntx->commit(); @@ -203,12 +208,12 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name) insert_context.makeQueryContext(); auto insert = std::make_shared(); - insert->table_id = helper_table->getStorageID(); + insert->table_id = nested_storage->getStorageID(); InterpreterInsertQuery interpreter(insert, insert_context); auto block_io = interpreter.execute(); - const StorageInMemoryMetadata & storage_metadata = helper_table->getInMemoryMetadata(); + const StorageInMemoryMetadata & storage_metadata = nested_storage->getInMemoryMetadata(); auto sample_block = storage_metadata.getSampleBlockNonMaterialized(); PostgreSQLBlockInputStream input(std::move(stx), query_str, sample_block, DEFAULT_BLOCK_SIZE); @@ -296,10 +301,18 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, st } +void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr ntx) +{ + std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name); + ntx->exec(query_str); +} + + /// Only used when MaterializePostgreSQL table is dropped. -void PostgreSQLReplicationHandler::checkAndDropReplicationSlot() +void PostgreSQLReplicationHandler::removeSlotAndPublication() { auto ntx = std::make_shared(*replication_connection->conn()); + dropPublication(ntx); if (isReplicationSlotExist(ntx, replication_slot)) dropReplicationSlot(ntx, replication_slot, false); ntx->commit(); diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 2e85bae5cb9..5cc4d336921 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -24,11 +24,12 @@ public: const std::string & conn_str_, std::shared_ptr context_, const std::string & publication_slot_name_, - const std::string & replication_slot_name_); + const std::string & replication_slot_name_, + const size_t max_block_size_); void startup(StoragePtr storage_); void shutdown(); - void checkAndDropReplicationSlot(); + void removeSlotAndPublication(); private: using NontransactionPtr = std::shared_ptr; @@ -41,6 +42,7 @@ private: void createTempReplicationSlot(NontransactionPtr ntx, LSNPosition & start_lsn, std::string & snapshot_name); void createReplicationSlot(NontransactionPtr ntx); void dropReplicationSlot(NontransactionPtr tx, std::string & slot_name, bool use_replication_api); + void dropPublication(NontransactionPtr ntx); void startReplication(); void loadFromSnapshot(std::string & snapshot_name); @@ -53,6 +55,7 @@ private: std::string publication_name, replication_slot; std::string temp_replication_slot; + const size_t max_block_size; PostgreSQLConnectionPtr connection; PostgreSQLConnectionPtr replication_connection; @@ -60,7 +63,7 @@ private: BackgroundSchedulePool::TaskHolder startup_task; std::shared_ptr consumer; - StoragePtr helper_table; + StoragePtr nested_storage; //LSNPosition start_lsn, final_lsn; }; diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp index bca77f314cd..13cd5321737 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.cpp @@ -27,7 +27,6 @@ #include #include "PostgreSQLReplicationSettings.h" -#include "PostgreSQLReplicaBlockInputStream.h" #include #include @@ -61,13 +60,15 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica( relative_data_path.resize(relative_data_path.size() - 1); relative_data_path += nested_storage_suffix; + replication_handler = std::make_unique( remote_database_name, remote_table_name, connection_str, global_context, global_context->getMacros()->expand(replication_settings->postgresql_replication_slot_name.value), - global_context->getMacros()->expand(replication_settings->postgresql_publication_name.value) + global_context->getMacros()->expand(replication_settings->postgresql_publication_name.value), + global_context->getSettingsRef().postgresql_replica_max_rows_to_insert.value ); } @@ -180,12 +181,13 @@ void StoragePostgreSQLReplica::startup() { Context context_copy(*global_context); const auto ast_create = getCreateHelperTableQuery(); + auto table_id = getStorageID(); Poco::File path(relative_data_path); if (!path.exists()) { LOG_TRACE(&Poco::Logger::get("StoragePostgreSQLReplica"), - "Creating helper table {}", getStorageID().table_name + nested_storage_suffix); + "Creating helper table {}", table_id.table_name + nested_storage_suffix); InterpreterCreateQuery interpreter(ast_create, context_copy); interpreter.execute(); } @@ -193,8 +195,13 @@ void StoragePostgreSQLReplica::startup() LOG_TRACE(&Poco::Logger::get("StoragePostgreSQLReplica"), "Directory already exists {}", relative_data_path); - nested_storage = createTableFromAST(ast_create->as(), getStorageID().database_name, relative_data_path, context_copy, false).second; - nested_storage->startup(); + nested_storage = DatabaseCatalog::instance().getTable( + StorageID(table_id.database_name, table_id.table_name + nested_storage_suffix), + *global_context); + + //nested_storage = createTableFromAST( + // ast_create->as(), getStorageID().database_name, relative_data_path, context_copy, false).second; + //nested_storage->startup(); replication_handler->startup(nested_storage); } @@ -208,8 +215,7 @@ void StoragePostgreSQLReplica::shutdown() void StoragePostgreSQLReplica::shutdownFinal() { - /// TODO: Under lock? Make sure synchronization stopped. - replication_handler->checkAndDropReplicationSlot(); + replication_handler->removeSlotAndPublication(); dropNested(); } diff --git a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h index c02c9696d87..8dbfeb79bf0 100644 --- a/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h +++ b/src/Storages/PostgreSQL/StoragePostgreSQLReplica.h @@ -19,7 +19,6 @@ #include #include "PostgreSQLReplicationHandler.h" #include "PostgreSQLReplicationSettings.h" -#include "buffer_fwd.h" #include "pqxx/pqxx" namespace DB @@ -46,6 +45,7 @@ public: size_t max_block_size, unsigned num_streams) override; + /// Called right after shutdown() in case of drop query void shutdownFinal(); protected: diff --git a/src/Storages/PostgreSQL/buffer_fwd.h b/src/Storages/PostgreSQL/buffer_fwd.h deleted file mode 100644 index 40ffd64aad3..00000000000 --- a/src/Storages/PostgreSQL/buffer_fwd.h +++ /dev/null @@ -1,9 +0,0 @@ -#pragma once - -namespace DB -{ - -class PostgreSQLReplicaConsumerBuffer; -using ConsumerBufferPtr = std::shared_ptr; - -} diff --git a/src/Storages/PostgreSQL/insertPostgreSQLValue.cpp b/src/Storages/PostgreSQL/insertPostgreSQLValue.cpp new file mode 100644 index 00000000000..5d4723364dc --- /dev/null +++ b/src/Storages/PostgreSQL/insertPostgreSQLValue.cpp @@ -0,0 +1,208 @@ +#include "insertPostgreSQLValue.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +void insertPostgreSQLValue( + IColumn & column, std::string_view value, + const ExternalResultDescription::ValueType type, const DataTypePtr data_type, + std::unordered_map & array_info, size_t idx) +{ + switch (type) + { + case ExternalResultDescription::ValueType::vtUInt8: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtUInt16: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtUInt32: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtUInt64: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtInt8: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtInt16: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtInt32: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtInt64: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtFloat32: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtFloat64: + assert_cast(column).insertValue(pqxx::from_string(value)); + break; + case ExternalResultDescription::ValueType::vtFixedString:[[fallthrough]]; + case ExternalResultDescription::ValueType::vtString: + assert_cast(column).insertData(value.data(), value.size()); + break; + case ExternalResultDescription::ValueType::vtUUID: + assert_cast(column).insert(parse(value.data(), value.size())); + break; + case ExternalResultDescription::ValueType::vtDate: + assert_cast(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()}); + break; + case ExternalResultDescription::ValueType::vtDateTime: + assert_cast(column).insertValue(time_t{LocalDateTime{std::string(value)}}); + break; + case ExternalResultDescription::ValueType::vtDateTime64:[[fallthrough]]; + case ExternalResultDescription::ValueType::vtDecimal32: [[fallthrough]]; + case ExternalResultDescription::ValueType::vtDecimal64: [[fallthrough]]; + case ExternalResultDescription::ValueType::vtDecimal128: [[fallthrough]]; + case ExternalResultDescription::ValueType::vtDecimal256: + { + ReadBufferFromString istr(value); + data_type->deserializeAsWholeText(column, istr, FormatSettings{}); + break; + } + case ExternalResultDescription::ValueType::vtArray: + { + pqxx::array_parser parser{value}; + std::pair parsed = parser.get_next(); + + size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions; + const auto parse_value = array_info[idx].pqxx_parser; + std::vector> dimensions(expected_dimensions + 1); + + while (parsed.first != pqxx::array_parser::juncture::done) + { + if ((parsed.first == pqxx::array_parser::juncture::row_start) && (++dimension > expected_dimensions)) + throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS); + + else if (parsed.first == pqxx::array_parser::juncture::string_value) + dimensions[dimension].emplace_back(parse_value(parsed.second)); + + else if (parsed.first == pqxx::array_parser::juncture::null_value) + dimensions[dimension].emplace_back(array_info[idx].default_value); + + else if (parsed.first == pqxx::array_parser::juncture::row_end) + { + max_dimension = std::max(max_dimension, dimension); + + if (--dimension == 0) + break; + + dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end())); + dimensions[dimension + 1].clear(); + } + + parsed = parser.get_next(); + } + + if (max_dimension < expected_dimensions) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions); + + assert_cast(column).insert(Array(dimensions[1].begin(), dimensions[1].end())); + break; + } + } +} + + +void preparePostgreSQLArrayInfo( + std::unordered_map & array_info, size_t column_idx, const DataTypePtr data_type) +{ + const auto * array_type = typeid_cast(data_type.get()); + auto nested = array_type->getNestedType(); + + size_t count_dimensions = 1; + while (isArray(nested)) + { + ++count_dimensions; + nested = typeid_cast(nested.get())->getNestedType(); + } + + Field default_value = nested->getDefault(); + if (nested->isNullable()) + nested = static_cast(nested.get())->getNestedType(); + + WhichDataType which(nested); + std::function parser; + + if (which.isUInt8() || which.isUInt16()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isInt8() || which.isInt16()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isUInt32()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isInt32()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isUInt64()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isInt64()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isFloat32()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isFloat64()) + parser = [](std::string & field) -> Field { return pqxx::from_string(field); }; + else if (which.isString() || which.isFixedString()) + parser = [](std::string & field) -> Field { return field; }; + else if (which.isDate()) + parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; }; + else if (which.isDateTime()) + parser = [](std::string & field) -> Field { return time_t{LocalDateTime{field}}; }; + else if (which.isDecimal32()) + parser = [nested](std::string & field) -> Field + { + const auto & type = typeid_cast *>(nested.get()); + DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); + return convertFieldToType(field, res); + }; + else if (which.isDecimal64()) + parser = [nested](std::string & field) -> Field + { + const auto & type = typeid_cast *>(nested.get()); + DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); + return convertFieldToType(field, res); + }; + else if (which.isDecimal128()) + parser = [nested](std::string & field) -> Field + { + const auto & type = typeid_cast *>(nested.get()); + DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); + return convertFieldToType(field, res); + }; + else if (which.isDecimal256()) + parser = [nested](std::string & field) -> Field + { + const auto & type = typeid_cast *>(nested.get()); + DataTypeDecimal res(getDecimalPrecision(*type), getDecimalScale(*type)); + return convertFieldToType(field, res); + }; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName()); + + array_info[column_idx] = {count_dimensions, default_value, parser}; +} +} + diff --git a/src/Storages/PostgreSQL/insertPostgreSQLValue.h b/src/Storages/PostgreSQL/insertPostgreSQLValue.h new file mode 100644 index 00000000000..1582d35d096 --- /dev/null +++ b/src/Storages/PostgreSQL/insertPostgreSQLValue.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include +#include +#include + + +namespace DB +{ + +struct PostgreSQLArrayInfo +{ + size_t num_dimensions; + Field default_value; + std::function pqxx_parser; +}; + + +void insertPostgreSQLValue( + IColumn & column, std::string_view value, + const ExternalResultDescription::ValueType type, const DataTypePtr data_type, + std::unordered_map & array_info, size_t idx); + +void preparePostgreSQLArrayInfo( + std::unordered_map & array_info, size_t column_idx, const DataTypePtr data_type); + +} diff --git a/tests/integration/test_storage_postgresql_replica/test.py b/tests/integration/test_storage_postgresql_replica/test.py index ccf0b2eee13..487ee2a35cb 100644 --- a/tests/integration/test_storage_postgresql_replica/test.py +++ b/tests/integration/test_storage_postgresql_replica/test.py @@ -13,7 +13,7 @@ instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml' postgres_table_template = """ CREATE TABLE IF NOT EXISTS {} ( - key Integer NOT NULL, value Integer, PRIMARY KEY (key)) + key Integer NOT NULL, value Integer) """ def get_postgres_conn(database=False): @@ -108,6 +108,70 @@ def test_no_connection_at_startup(started_cluster): cursor.execute('DROP TABLE postgresql_replica;') postgresql_replica_check_result(result, True) + +def test_detach_attach_is_ok(started_cluster): + conn = get_postgres_conn(True) + cursor = conn.cursor() + create_postgres_table(cursor, 'postgresql_replica'); + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)") + + instance.query(''' + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) + ENGINE = PostgreSQLReplica( + 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') + PRIMARY KEY key; + ''') + + result = instance.query('SELECT * FROM test.postgresql_replica;') + postgresql_replica_check_result(result, True) + + instance.query('DETACH TABLE test.postgresql_replica') + instance.query('ATTACH TABLE test.postgresql_replica') + + result = instance.query('SELECT * FROM test.postgresql_replica;') + cursor.execute('DROP TABLE postgresql_replica;') + postgresql_replica_check_result(result, True) + + +def test_replicating_inserts(started_cluster): + conn = get_postgres_conn(True) + cursor = conn.cursor() + create_postgres_table(cursor, 'postgresql_replica'); + + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)") + + instance.query(''' + CREATE TABLE test.postgresql_replica (key UInt64, value UInt64) + ENGINE = PostgreSQLReplica( + 'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword') + PRIMARY KEY key; + ''') + + result = instance.query('SELECT count() FROM test.postgresql_replica;') + assert(int(result) == 10) + + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 10 + number, 10 + number from numbers(10)") + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 20 + number, 20 + number from numbers(10)") + + time.sleep(4) + + result = instance.query('SELECT count() FROM test.postgresql_replica;') + assert(int(result) == 30) + + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 30 + number, 30 + number from numbers(10)") + instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 40 + number, 40 + number from numbers(10)") + + time.sleep(4) + + result = instance.query('SELECT count() FROM test.postgresql_replica;') + assert(int(result) == 50) + + result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;') + + cursor.execute('DROP TABLE postgresql_replica;') + postgresql_replica_check_result(result, True) + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...")