Replicate insert queries

This commit is contained in:
kssenii 2021-01-31 19:03:03 +00:00
parent 179a558a04
commit 4aadd0c3f2
16 changed files with 513 additions and 527 deletions

View File

@ -22,11 +22,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
std::unique_ptr<pqxx::work> tx_,
const std::string & query_str_,
@ -38,8 +33,8 @@ PostgreSQLBlockInputStream::PostgreSQLBlockInputStream(
{
description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns()))
if (description.types[idx].first == ValueType::vtArray)
prepareArrayInfo(idx, description.sample_block.getByPosition(idx).type);
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
/// pqxx::stream_from uses COPY command, will get error if ';' is present
if (query_str.ends_with(';'))
query_str.resize(query_str.size() - 1);
@ -80,12 +75,17 @@ Block PostgreSQLBlockInputStream::readImpl()
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(column_nullable.getNestedColumn(), (*row)[idx], description.types[idx].first, data_type.getNestedType(), idx);
insertPostgreSQLValue(
column_nullable.getNestedColumn(), (*row)[idx],
description.types[idx].first, data_type.getNestedType(), array_info, idx);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertValue(*columns[idx], (*row)[idx], description.types[idx].first, sample.type, idx);
insertPostgreSQLValue(
*columns[idx], (*row)[idx], description.types[idx].first, sample.type, array_info, idx);
}
}
else
@ -113,183 +113,6 @@ void PostgreSQLBlockInputStream::readSuffix()
}
void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx)
{
switch (type)
{
case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(pqxx::from_string<uint32_t>(value));
break;
case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(pqxx::from_string<uint64_t>(value));
break;
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(pqxx::from_string<int32_t>(value));
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(pqxx::from_string<int64_t>(value));
break;
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(pqxx::from_string<float>(value));
break;
case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(pqxx::from_string<double>(value));
break;
case ValueType::vtFixedString:[[fallthrough]];
case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
break;
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});
break;
case ValueType::vtDateTime:
assert_cast<ColumnUInt32 &>(column).insertValue(time_t{LocalDateTime{std::string(value)}});
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128: [[fallthrough]];
case ValueType::vtDecimal256:
{
ReadBufferFromString istr(value);
data_type->deserializeAsWholeText(column, istr, FormatSettings{});
break;
}
case ValueType::vtArray:
{
pqxx::array_parser parser{value};
std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();
size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions;
const auto parse_value = array_info[idx].pqxx_parser;
std::vector<std::vector<Field>> dimensions(expected_dimensions + 1);
while (parsed.first != pqxx::array_parser::juncture::done)
{
if ((parsed.first == pqxx::array_parser::juncture::row_start) && (++dimension > expected_dimensions))
throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS);
else if (parsed.first == pqxx::array_parser::juncture::string_value)
dimensions[dimension].emplace_back(parse_value(parsed.second));
else if (parsed.first == pqxx::array_parser::juncture::null_value)
dimensions[dimension].emplace_back(array_info[idx].default_value);
else if (parsed.first == pqxx::array_parser::juncture::row_end)
{
max_dimension = std::max(max_dimension, dimension);
if (--dimension == 0)
break;
dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end()));
dimensions[dimension + 1].clear();
}
parsed = parser.get_next();
}
if (max_dimension < expected_dimensions)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions);
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
break;
}
}
}
void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataTypePtr data_type)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
auto nested = array_type->getNestedType();
size_t count_dimensions = 1;
while (isArray(nested))
{
++count_dimensions;
nested = typeid_cast<const DataTypeArray *>(nested.get())->getNestedType();
}
Field default_value = nested->getDefault();
if (nested->isNullable())
nested = static_cast<const DataTypeNullable *>(nested.get())->getNestedType();
WhichDataType which(nested);
std::function<Field(std::string & fields)> parser;
if (which.isUInt8() || which.isUInt16())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint16_t>(field); };
else if (which.isInt8() || which.isInt16())
parser = [](std::string & field) -> Field { return pqxx::from_string<int16_t>(field); };
else if (which.isUInt32())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint32_t>(field); };
else if (which.isInt32())
parser = [](std::string & field) -> Field { return pqxx::from_string<int32_t>(field); };
else if (which.isUInt64())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint64_t>(field); };
else if (which.isInt64())
parser = [](std::string & field) -> Field { return pqxx::from_string<int64_t>(field); };
else if (which.isFloat32())
parser = [](std::string & field) -> Field { return pqxx::from_string<float>(field); };
else if (which.isFloat64())
parser = [](std::string & field) -> Field { return pqxx::from_string<double>(field); };
else if (which.isString() || which.isFixedString())
parser = [](std::string & field) -> Field { return field; };
else if (which.isDate())
parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; };
else if (which.isDateTime())
parser = [](std::string & field) -> Field { return time_t{LocalDateTime{field}}; };
else if (which.isDecimal32())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());
DataTypeDecimal<Decimal32> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal64())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());
DataTypeDecimal<Decimal64> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal128())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
DataTypeDecimal<Decimal128> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal256())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal256> *>(nested.get());
DataTypeDecimal<Decimal256> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());
array_info[column_idx] = {count_dimensions, default_value, parser};
}
}

View File

@ -10,6 +10,7 @@
#include <Core/ExternalResultDescription.h>
#include <Core/Field.h>
#include <pqxx/pqxx>
#include <Storages/PostgreSQL/insertPostgreSQLValue.h>
namespace DB
@ -29,19 +30,14 @@ public:
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
using ValueType = ExternalResultDescription::ValueType;
void readPrefix() override;
Block readImpl() override;
void readSuffix() override;
void insertValue(IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type, size_t idx);
void insertDefaultValue(IColumn & column, const IColumn & sample_column)
{
column.insertFrom(sample_column, 0);
}
void prepareArrayInfo(size_t column_idx, const DataTypePtr data_type);
String query_str;
const UInt64 max_block_size;
@ -51,13 +47,7 @@ private:
std::unique_ptr<pqxx::work> tx;
std::unique_ptr<pqxx::stream_from> stream;
struct ArrayInfo
{
size_t num_dimensions;
Field default_value;
std::function<Field(std::string & field)> pqxx_parser;
};
std::unordered_map<size_t, ArrayInfo> array_info;
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
};
}

View File

@ -1,149 +0,0 @@
#include "PostgreSQLReplicaBlockInputStream.h"
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace DB
{
PostgreSQLReplicaBlockInputStream::PostgreSQLReplicaBlockInputStream(
StoragePostgreSQLReplica & storage_,
ConsumerBufferPtr buffer_,
const StorageMetadataPtr & metadata_snapshot_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_)
: storage(storage_)
, buffer(buffer_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, sample_block(non_virtual_header)
, virtual_header(metadata_snapshot->getSampleBlockForColumns({}, storage.getVirtuals(), storage.getStorageID()))
{
for (const auto & column : virtual_header)
sample_block.insert(column);
}
PostgreSQLReplicaBlockInputStream::~PostgreSQLReplicaBlockInputStream()
{
}
void PostgreSQLReplicaBlockInputStream::readPrefixImpl()
{
}
Block PostgreSQLReplicaBlockInputStream::readImpl()
{
if (!buffer || finished)
return Block();
finished = true;
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
"Values", *buffer, non_virtual_header, *context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
port.setNeeded();
auto read_rabbitmq_message = [&]
{
size_t new_rows = 0;
while (true)
{
auto status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
input_format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
}
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
}
};
size_t total_rows = 0;
while (true)
{
if (buffer->eof())
break;
auto new_rows = read_rabbitmq_message();
if (new_rows)
{
//auto timestamp = buffer->getTimestamp();
//for (size_t i = 0; i < new_rows; ++i)
//{
// virtual_columns[0]->insert(timestamp);
//}
total_rows = total_rows + new_rows;
}
buffer->allowNext();
if (total_rows >= max_block_size || !checkTimeLimit())
break;
}
if (total_rows == 0)
return Block();
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
result_block.insert(column);
return result_block;
}
void PostgreSQLReplicaBlockInputStream::readSuffixImpl()
{
}
}

View File

@ -1,47 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include "StoragePostgreSQLReplica.h"
#include "PostgreSQLReplicaConsumerBuffer.h"
#include "buffer_fwd.h"
namespace DB
{
class PostgreSQLReplicaBlockInputStream : public IBlockInputStream
{
public:
PostgreSQLReplicaBlockInputStream(
StoragePostgreSQLReplica & storage_,
ConsumerBufferPtr buffer_,
const StorageMetadataPtr & metadata_snapshot_,
std::shared_ptr<Context> context_,
const Names & columns,
size_t max_block_size_);
~PostgreSQLReplicaBlockInputStream() override;
String getName() const override { return storage.getName(); }
Block getHeader() const override { return sample_block; }
void readPrefixImpl() override;
Block readImpl() override;
void readSuffixImpl() override;
private:
StoragePostgreSQLReplica & storage;
ConsumerBufferPtr buffer;
StorageMetadataPtr metadata_snapshot;
std::shared_ptr<Context> context;
Names column_names;
const size_t max_block_size;
bool finished = false;
const Block non_virtual_header;
Block sample_block;
const Block virtual_header;
};
}

View File

@ -2,13 +2,30 @@
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <ext/range.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/FieldVisitors.h>
#include <Common/hex.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <DataStreams/copyData.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -17,9 +34,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static const auto wal_reader_reschedule_ms = 500;
static const auto reschedule_ms = 500;
static const auto max_thread_work_duration_ms = 60000;
static const auto max_empty_slot_reads = 20;
static const auto max_empty_slot_reads = 2;
PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
std::shared_ptr<Context> context_,
@ -27,7 +44,9 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
const std::string & conn_str,
const std::string & replication_slot_name_,
const std::string & publication_name_,
const LSNPosition & start_lsn)
const LSNPosition & start_lsn,
const size_t max_block_size_,
StoragePtr nested_storage_)
: log(&Poco::Logger::get("PostgreSQLReaplicaConsumer"))
, context(context_)
, replication_slot_name(replication_slot_name_)
@ -35,40 +54,49 @@ PostgreSQLReplicaConsumer::PostgreSQLReplicaConsumer(
, table_name(table_name_)
, connection(std::make_shared<PostgreSQLConnection>(conn_str))
, current_lsn(start_lsn)
, max_block_size(max_block_size_)
, nested_storage(nested_storage_)
, sample_block(nested_storage->getInMemoryMetadata().getSampleBlock())
{
replication_connection = std::make_shared<PostgreSQLConnection>(fmt::format("{} replication=database", conn_str));
wal_reader_task = context->getSchedulePool().createTask("PostgreSQLReplicaWALReader", [this]{ WALReaderFunc(); });
description.init(sample_block);
for (const auto idx : ext::range(0, description.sample_block.columns()))
if (description.types[idx].first == ExternalResultDescription::ValueType::vtArray)
preparePostgreSQLArrayInfo(array_info, idx, description.sample_block.getByPosition(idx).type);
columns = description.sample_block.cloneEmptyColumns();
wal_reader_task = context->getSchedulePool().createTask("PostgreSQLReplicaWALReader", [this]{ replicationStream(); });
wal_reader_task->deactivate();
}
void PostgreSQLReplicaConsumer::startSynchronization()
{
//wal_reader_task->activateAndSchedule();
wal_reader_task->activateAndSchedule();
}
void PostgreSQLReplicaConsumer::stopSynchronization()
{
stop_synchronization.store(true);
if (wal_reader_task)
wal_reader_task->deactivate();
wal_reader_task->deactivate();
}
void PostgreSQLReplicaConsumer::WALReaderFunc()
void PostgreSQLReplicaConsumer::replicationStream()
{
size_t count_empty_slot_reads = 0;
auto start_time = std::chrono::steady_clock::now();
LOG_TRACE(log, "Starting synchronization thread");
LOG_TRACE(log, "Starting replication stream");
while (!stop_synchronization)
{
if (!readFromReplicationSlot() && ++count_empty_slot_reads == max_empty_slot_reads)
{
LOG_TRACE(log, "Reschedule synchronization. Replication slot is empty.");
LOG_TRACE(log, "Reschedule replication stream. Replication slot is empty.");
break;
}
else
@ -78,13 +106,38 @@ void PostgreSQLReplicaConsumer::WALReaderFunc()
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
if (duration.count() > max_thread_work_duration_ms)
{
LOG_TRACE(log, "Reschedule synchronization. Thread work duration limit exceeded.");
LOG_TRACE(log, "Reschedule replication_stream. Thread work duration limit exceeded.");
break;
}
}
if (!stop_synchronization)
wal_reader_task->scheduleAfter(wal_reader_reschedule_ms);
wal_reader_task->scheduleAfter(reschedule_ms);
}
void PostgreSQLReplicaConsumer::insertValue(std::string & value, size_t column_idx)
{
const auto & sample = description.sample_block.getByPosition(column_idx);
bool is_nullable = description.types[column_idx].second;
LOG_TRACE(log, "INSERTING VALUE {}", value);
if (is_nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[column_idx]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertPostgreSQLValue(
column_nullable.getNestedColumn(), value,
description.types[column_idx].first, data_type.getNestedType(), array_info, column_idx);
column_nullable.getNullMapData().emplace_back(0);
}
else
{
insertPostgreSQLValue(
*columns[column_idx], value, description.types[column_idx].first, sample.type, array_info, column_idx);
}
}
@ -150,17 +203,24 @@ void PostgreSQLReplicaConsumer::readTupleData(const char * message, size_t & pos
Int16 num_columns = readInt16(message, pos);
/// 'n' means nullable, 'u' means TOASTed value, 't' means text formatted data
LOG_DEBUG(log, "num_columns {}", num_columns);
for (int k = 0; k < num_columns; ++k)
for (int column_idx = 0; column_idx < num_columns; ++column_idx)
{
char identifier = readInt8(message, pos);
Int32 col_len = readInt32(message, pos);
String result;
String value;
for (int i = 0; i < col_len; ++i)
{
result += readInt8(message, pos);
value += readInt8(message, pos);
}
LOG_DEBUG(log, "identifier {}, col_len {}, result {}", identifier, col_len, result);
insertValue(value, column_idx);
LOG_DEBUG(log, "identifier {}, col_len {}, value {}", identifier, col_len, value);
}
String val = "1";
insertValue(val, num_columns);
insertValue(val, num_columns + 1);
//readString(message, pos, size, result);
}
@ -171,6 +231,7 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio
size_t pos = 2;
char type = readInt8(replication_message, pos);
LOG_TRACE(log, "TYPE: {}", type);
switch (type)
{
@ -180,6 +241,7 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio
Int64 transaction_commit_timestamp = readInt64(replication_message, pos);
LOG_DEBUG(log, "transaction lsn {}, transaction commit timespamp {}",
transaction_end_lsn, transaction_commit_timestamp);
//current_lsn.lsn_value = transaction_end_lsn;
break;
}
case 'C': // Commit
@ -191,6 +253,7 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio
Int64 transaction_commit_timestamp = readInt64(replication_message, pos);
LOG_DEBUG(log, "commit lsn {}, transaction lsn {}, transaction commit timestamp {}",
commit_lsn, transaction_end_lsn, transaction_commit_timestamp);
final_lsn.lsn = current_lsn.lsn;
break;
}
case 'O': // Origin
@ -245,16 +308,49 @@ void PostgreSQLReplicaConsumer::decodeReplicationMessage(const char * replicatio
}
void PostgreSQLReplicaConsumer::syncIntoTable(Block & block)
{
Context insert_context(*context);
insert_context.makeQueryContext();
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = nested_storage->getStorageID();
InterpreterInsertQuery interpreter(insert, insert_context);
auto block_io = interpreter.execute();
OneBlockInputStream input(block);
copyData(input, *block_io.out);
LOG_TRACE(log, "TABLE SYNC END");
}
void PostgreSQLReplicaConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx)
{
LOG_TRACE(log, "CURRENT LSN FROM TO {}", final_lsn.lsn);
std::string query_str = fmt::format("SELECT pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn.lsn);
pqxx::result result{ntx->exec(query_str)};
if (!result.empty())
{
std::string s1 = result[0].size() > 0 && !result[0][0].is_null() ? result[0][0].as<std::string>() : "NULL";
std::string s2 = result[0].size() > 1 && !result[0][1].is_null() ? result[0][1].as<std::string>() : "NULL";
LOG_TRACE(log, "ADVANCE LSN: {} and {}", s1, s2);
}
}
/// Read binary changes from replication slot via copy command.
bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
{
columns = description.sample_block.cloneEmptyColumns();
bool slot_empty = true;
try
{
auto tx = std::make_unique<pqxx::nontransaction>(*replication_connection->conn());
auto tx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
/// up_to_lsn is set to NULL, up_to_n_changes is set to max_block_size.
std::string query_str = fmt::format(
"select data FROM pg_logical_slot_peek_binary_changes("
"select lsn, data FROM pg_logical_slot_peek_binary_changes("
"'{}', NULL, NULL, 'publication_names', '{}', 'proto_version', '1')",
replication_slot_name, publication_name);
pqxx::stream_from stream(*tx, pqxx::from_query, std::string_view(query_str));
@ -267,17 +363,23 @@ bool PostgreSQLReplicaConsumer::readFromReplicationSlot()
{
LOG_TRACE(log, "STREAM REPLICATION END");
stream.complete();
Block result_rows = description.sample_block.cloneWithColumns(std::move(columns));
if (result_rows.rows())
{
syncIntoTable(result_rows);
advanceLSN(tx);
}
tx->commit();
break;
}
slot_empty = false;
for (const auto idx : ext::range(0, row->size()))
{
LOG_TRACE(log, "Replication message: {}", (*row)[idx]);
decodeReplicationMessage((*row)[idx].c_str(), (*row)[idx].size());
}
current_lsn.lsn = (*row)[0];
LOG_TRACE(log, "Replication message: {}", (*row)[1]);
decodeReplicationMessage((*row)[1].c_str(), (*row)[1].size());
}
}
catch (...)

View File

@ -3,7 +3,10 @@
#include "PostgreSQLConnection.h"
#include <Core/BackgroundSchedulePool.h>
#include <common/logger_useful.h>
#include <Storages/IStorage.h>
#include <Core/ExternalResultDescription.h>
#include "pqxx/pqxx"
#include <Storages/PostgreSQL/insertPostgreSQLValue.h>
namespace DB
{
@ -11,8 +14,9 @@ namespace DB
struct LSNPosition
{
std::string lsn;
int64_t lsn_value;
uint64_t getValue()
int64_t getValue()
{
uint64_t upper_half, lower_half, result;
std::sscanf(lsn.data(), "%lX/%lX", &upper_half, &lower_half);
@ -22,6 +26,15 @@ struct LSNPosition
// upper_half, lower_half, result);
return result;
}
std::string getString()
{
char result[16];
std::snprintf(result, sizeof(result), "%lX/%lX", (lsn_value >> 32), lsn_value & 0xFFFFFFFF);
//assert(lsn_value == result.getValue());
std::string ans = result;
return ans;
}
};
@ -34,20 +47,28 @@ public:
const std::string & conn_str_,
const std::string & replication_slot_name_,
const std::string & publication_name_,
const LSNPosition & start_lsn);
const LSNPosition & start_lsn,
const size_t max_block_size_,
StoragePtr nested_storage_);
/// Start reading WAL from current_lsn position. Initial data sync from created snapshot already done.
void startSynchronization();
void stopSynchronization();
private:
/// Executed by wal_reader_task. A separate thread reads wal and advances lsn when rows were written via copyData.
void WALReaderFunc();
/// Executed by wal_reader_task. A separate thread reads wal and advances lsn to last commited position
/// after rows were written via copyData.
void replicationStream();
void stopReplicationStream();
/// Start changes stream from WAL via copy command (up to max_block_size changes).
bool readFromReplicationSlot();
void decodeReplicationMessage(const char * replication_message, size_t size);
void insertValue(std::string & value, size_t column_idx);
void syncIntoTable(Block & block);
void advanceLSN(std::shared_ptr<pqxx::nontransaction> ntx);
/// Methods to parse replication message data.
void readTupleData(const char * message, size_t & pos, size_t size);
void readString(const char * message, size_t & pos, size_t size, String & result);
@ -64,9 +85,18 @@ private:
const std::string table_name;
PostgreSQLConnectionPtr connection, replication_connection;
LSNPosition current_lsn;
LSNPosition current_lsn, final_lsn;
BackgroundSchedulePool::TaskHolder wal_reader_task;
//BackgroundSchedulePool::TaskHolder table_sync_task;
std::atomic<bool> stop_synchronization = false;
const size_t max_block_size;
StoragePtr nested_storage;
Block sample_block;
ExternalResultDescription description;
MutableColumns columns;
/// Needed for insertPostgreSQLValue() method to parse array
std::unordered_map<size_t, PostgreSQLArrayInfo> array_info;
};
}

View File

@ -1,38 +0,0 @@
#include "PostgreSQLReplicaConsumerBuffer.h"
namespace DB
{
PostgreSQLReplicaConsumerBuffer::PostgreSQLReplicaConsumerBuffer(
uint64_t max_block_size_)
: ReadBuffer(nullptr, 0)
, rows_data(max_block_size_)
{
}
PostgreSQLReplicaConsumerBuffer::~PostgreSQLReplicaConsumerBuffer()
{
BufferBase::set(nullptr, 0, 0);
}
bool PostgreSQLReplicaConsumerBuffer::nextImpl()
{
if (!allowed)
return false;
if (rows_data.tryPop(current_row_data))
{
auto * new_position = const_cast<char *>(current_row_data.data.data());
BufferBase::set(new_position, current_row_data.data.size(), 0);
allowed = false;
return true;
}
return false;
}
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <Core/Names.h>
#include <common/types.h>
#include <IO/ReadBuffer.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <common/logger_useful.h>
#include "buffer_fwd.h"
namespace DB
{
class PostgreSQLReplicaConsumerBuffer : public ReadBuffer
{
public:
PostgreSQLReplicaConsumerBuffer(
uint64_t max_block_size_);
~PostgreSQLReplicaConsumerBuffer() override;
void allowNext() { allowed = true; }
private:
bool nextImpl() override;
struct RowData
{
String data;
RowData() : data("") {}
};
RowData current_row_data;
ConcurrentBoundedQueue<RowData> rows_data;
bool allowed = true;
};
}

View File

@ -28,13 +28,15 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const std::string & conn_str,
std::shared_ptr<Context> context_,
const std::string & publication_name_,
const std::string & replication_slot_name_)
const std::string & replication_slot_name_,
const size_t max_block_size_)
: log(&Poco::Logger::get("PostgreSQLReplicaHandler"))
, context(context_)
, database_name(database_name_)
, table_name(table_name_)
, publication_name(publication_name_)
, replication_slot(replication_slot_name_)
, max_block_size(max_block_size_)
, connection(std::make_shared<PostgreSQLConnection>(conn_str))
{
/// Create a replication connection, through which it is possible to execute only commands from streaming replication protocol
@ -56,7 +58,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
void PostgreSQLReplicationHandler::startup(StoragePtr storage)
{
helper_table = storage;
nested_storage = storage;
startup_task->activateAndSchedule();
}
@ -98,6 +100,7 @@ bool PostgreSQLReplicationHandler::isPublicationExist()
assert(!result.empty());
bool publication_exists = (result[0][0].as<std::string>() == "t");
/// TODO: check if publication is still valid?
if (publication_exists)
LOG_TRACE(log, "Publication {} already exists. Using existing version", publication_name);
@ -121,7 +124,7 @@ void PostgreSQLReplicationHandler::createPublication()
/// TODO: check replica identity
/// Requires changed replica identity for included table to be able to receive old values of updated rows.
/// (ALTER TABLE table_name REPLICA IDENTITY FULL)
/// (ALTER TABLE table_name REPLICA IDENTITY FULL ?)
}
@ -173,7 +176,9 @@ void PostgreSQLReplicationHandler::startReplication()
connection->conn_str(),
replication_slot,
publication_name,
start_lsn);
start_lsn,
max_block_size,
nested_storage);
LOG_DEBUG(log, "Commiting replication transaction");
ntx->commit();
@ -203,12 +208,12 @@ void PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_name)
insert_context.makeQueryContext();
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = helper_table->getStorageID();
insert->table_id = nested_storage->getStorageID();
InterpreterInsertQuery interpreter(insert, insert_context);
auto block_io = interpreter.execute();
const StorageInMemoryMetadata & storage_metadata = helper_table->getInMemoryMetadata();
const StorageInMemoryMetadata & storage_metadata = nested_storage->getInMemoryMetadata();
auto sample_block = storage_metadata.getSampleBlockNonMaterialized();
PostgreSQLBlockInputStream input(std::move(stx), query_str, sample_block, DEFAULT_BLOCK_SIZE);
@ -296,10 +301,18 @@ void PostgreSQLReplicationHandler::dropReplicationSlot(NontransactionPtr ntx, st
}
void PostgreSQLReplicationHandler::dropPublication(NontransactionPtr ntx)
{
std::string query_str = fmt::format("DROP PUBLICATION IF EXISTS {}", publication_name);
ntx->exec(query_str);
}
/// Only used when MaterializePostgreSQL table is dropped.
void PostgreSQLReplicationHandler::checkAndDropReplicationSlot()
void PostgreSQLReplicationHandler::removeSlotAndPublication()
{
auto ntx = std::make_shared<pqxx::nontransaction>(*replication_connection->conn());
dropPublication(ntx);
if (isReplicationSlotExist(ntx, replication_slot))
dropReplicationSlot(ntx, replication_slot, false);
ntx->commit();

View File

@ -24,11 +24,12 @@ public:
const std::string & conn_str_,
std::shared_ptr<Context> context_,
const std::string & publication_slot_name_,
const std::string & replication_slot_name_);
const std::string & replication_slot_name_,
const size_t max_block_size_);
void startup(StoragePtr storage_);
void shutdown();
void checkAndDropReplicationSlot();
void removeSlotAndPublication();
private:
using NontransactionPtr = std::shared_ptr<pqxx::nontransaction>;
@ -41,6 +42,7 @@ private:
void createTempReplicationSlot(NontransactionPtr ntx, LSNPosition & start_lsn, std::string & snapshot_name);
void createReplicationSlot(NontransactionPtr ntx);
void dropReplicationSlot(NontransactionPtr tx, std::string & slot_name, bool use_replication_api);
void dropPublication(NontransactionPtr ntx);
void startReplication();
void loadFromSnapshot(std::string & snapshot_name);
@ -53,6 +55,7 @@ private:
std::string publication_name, replication_slot;
std::string temp_replication_slot;
const size_t max_block_size;
PostgreSQLConnectionPtr connection;
PostgreSQLConnectionPtr replication_connection;
@ -60,7 +63,7 @@ private:
BackgroundSchedulePool::TaskHolder startup_task;
std::shared_ptr<PostgreSQLReplicaConsumer> consumer;
StoragePtr helper_table;
StoragePtr nested_storage;
//LSNPosition start_lsn, final_lsn;
};

View File

@ -27,7 +27,6 @@
#include <Storages/StorageFactory.h>
#include "PostgreSQLReplicationSettings.h"
#include "PostgreSQLReplicaBlockInputStream.h"
#include <Databases/DatabaseOnDisk.h>
#include <common/logger_useful.h>
@ -61,13 +60,15 @@ StoragePostgreSQLReplica::StoragePostgreSQLReplica(
relative_data_path.resize(relative_data_path.size() - 1);
relative_data_path += nested_storage_suffix;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
remote_database_name,
remote_table_name,
connection_str,
global_context,
global_context->getMacros()->expand(replication_settings->postgresql_replication_slot_name.value),
global_context->getMacros()->expand(replication_settings->postgresql_publication_name.value)
global_context->getMacros()->expand(replication_settings->postgresql_publication_name.value),
global_context->getSettingsRef().postgresql_replica_max_rows_to_insert.value
);
}
@ -180,12 +181,13 @@ void StoragePostgreSQLReplica::startup()
{
Context context_copy(*global_context);
const auto ast_create = getCreateHelperTableQuery();
auto table_id = getStorageID();
Poco::File path(relative_data_path);
if (!path.exists())
{
LOG_TRACE(&Poco::Logger::get("StoragePostgreSQLReplica"),
"Creating helper table {}", getStorageID().table_name + nested_storage_suffix);
"Creating helper table {}", table_id.table_name + nested_storage_suffix);
InterpreterCreateQuery interpreter(ast_create, context_copy);
interpreter.execute();
}
@ -193,8 +195,13 @@ void StoragePostgreSQLReplica::startup()
LOG_TRACE(&Poco::Logger::get("StoragePostgreSQLReplica"),
"Directory already exists {}", relative_data_path);
nested_storage = createTableFromAST(ast_create->as<const ASTCreateQuery &>(), getStorageID().database_name, relative_data_path, context_copy, false).second;
nested_storage->startup();
nested_storage = DatabaseCatalog::instance().getTable(
StorageID(table_id.database_name, table_id.table_name + nested_storage_suffix),
*global_context);
//nested_storage = createTableFromAST(
// ast_create->as<const ASTCreateQuery &>(), getStorageID().database_name, relative_data_path, context_copy, false).second;
//nested_storage->startup();
replication_handler->startup(nested_storage);
}
@ -208,8 +215,7 @@ void StoragePostgreSQLReplica::shutdown()
void StoragePostgreSQLReplica::shutdownFinal()
{
/// TODO: Under lock? Make sure synchronization stopped.
replication_handler->checkAndDropReplicationSlot();
replication_handler->removeSlotAndPublication();
dropNested();
}

View File

@ -19,7 +19,6 @@
#include <Interpreters/Context.h>
#include "PostgreSQLReplicationHandler.h"
#include "PostgreSQLReplicationSettings.h"
#include "buffer_fwd.h"
#include "pqxx/pqxx"
namespace DB
@ -46,6 +45,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
/// Called right after shutdown() in case of drop query
void shutdownFinal();
protected:

View File

@ -1,9 +0,0 @@
#pragma once
namespace DB
{
class PostgreSQLReplicaConsumerBuffer;
using ConsumerBufferPtr = std::shared_ptr<PostgreSQLReplicaConsumerBuffer>;
}

View File

@ -0,0 +1,208 @@
#include "insertPostgreSQLValue.h"
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Interpreters/convertFieldToType.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Common/assert_cast.h>
#include <ext/range.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void insertPostgreSQLValue(
IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type,
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx)
{
switch (type)
{
case ExternalResultDescription::ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ExternalResultDescription::ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(pqxx::from_string<uint16_t>(value));
break;
case ExternalResultDescription::ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(pqxx::from_string<uint32_t>(value));
break;
case ExternalResultDescription::ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(pqxx::from_string<uint64_t>(value));
break;
case ExternalResultDescription::ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ExternalResultDescription::ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(pqxx::from_string<int16_t>(value));
break;
case ExternalResultDescription::ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(pqxx::from_string<int32_t>(value));
break;
case ExternalResultDescription::ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(pqxx::from_string<int64_t>(value));
break;
case ExternalResultDescription::ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(pqxx::from_string<float>(value));
break;
case ExternalResultDescription::ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(pqxx::from_string<double>(value));
break;
case ExternalResultDescription::ValueType::vtFixedString:[[fallthrough]];
case ExternalResultDescription::ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
break;
case ExternalResultDescription::ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ExternalResultDescription::ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{std::string(value)}.getDayNum()});
break;
case ExternalResultDescription::ValueType::vtDateTime:
assert_cast<ColumnUInt32 &>(column).insertValue(time_t{LocalDateTime{std::string(value)}});
break;
case ExternalResultDescription::ValueType::vtDateTime64:[[fallthrough]];
case ExternalResultDescription::ValueType::vtDecimal32: [[fallthrough]];
case ExternalResultDescription::ValueType::vtDecimal64: [[fallthrough]];
case ExternalResultDescription::ValueType::vtDecimal128: [[fallthrough]];
case ExternalResultDescription::ValueType::vtDecimal256:
{
ReadBufferFromString istr(value);
data_type->deserializeAsWholeText(column, istr, FormatSettings{});
break;
}
case ExternalResultDescription::ValueType::vtArray:
{
pqxx::array_parser parser{value};
std::pair<pqxx::array_parser::juncture, std::string> parsed = parser.get_next();
size_t dimension = 0, max_dimension = 0, expected_dimensions = array_info[idx].num_dimensions;
const auto parse_value = array_info[idx].pqxx_parser;
std::vector<std::vector<Field>> dimensions(expected_dimensions + 1);
while (parsed.first != pqxx::array_parser::juncture::done)
{
if ((parsed.first == pqxx::array_parser::juncture::row_start) && (++dimension > expected_dimensions))
throw Exception("Got more dimensions than expected", ErrorCodes::BAD_ARGUMENTS);
else if (parsed.first == pqxx::array_parser::juncture::string_value)
dimensions[dimension].emplace_back(parse_value(parsed.second));
else if (parsed.first == pqxx::array_parser::juncture::null_value)
dimensions[dimension].emplace_back(array_info[idx].default_value);
else if (parsed.first == pqxx::array_parser::juncture::row_end)
{
max_dimension = std::max(max_dimension, dimension);
if (--dimension == 0)
break;
dimensions[dimension].emplace_back(Array(dimensions[dimension + 1].begin(), dimensions[dimension + 1].end()));
dimensions[dimension + 1].clear();
}
parsed = parser.get_next();
}
if (max_dimension < expected_dimensions)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Got less dimensions than expected. ({} instead of {})", max_dimension, expected_dimensions);
assert_cast<ColumnArray &>(column).insert(Array(dimensions[1].begin(), dimensions[1].end()));
break;
}
}
}
void preparePostgreSQLArrayInfo(
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type)
{
const auto * array_type = typeid_cast<const DataTypeArray *>(data_type.get());
auto nested = array_type->getNestedType();
size_t count_dimensions = 1;
while (isArray(nested))
{
++count_dimensions;
nested = typeid_cast<const DataTypeArray *>(nested.get())->getNestedType();
}
Field default_value = nested->getDefault();
if (nested->isNullable())
nested = static_cast<const DataTypeNullable *>(nested.get())->getNestedType();
WhichDataType which(nested);
std::function<Field(std::string & fields)> parser;
if (which.isUInt8() || which.isUInt16())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint16_t>(field); };
else if (which.isInt8() || which.isInt16())
parser = [](std::string & field) -> Field { return pqxx::from_string<int16_t>(field); };
else if (which.isUInt32())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint32_t>(field); };
else if (which.isInt32())
parser = [](std::string & field) -> Field { return pqxx::from_string<int32_t>(field); };
else if (which.isUInt64())
parser = [](std::string & field) -> Field { return pqxx::from_string<uint64_t>(field); };
else if (which.isInt64())
parser = [](std::string & field) -> Field { return pqxx::from_string<int64_t>(field); };
else if (which.isFloat32())
parser = [](std::string & field) -> Field { return pqxx::from_string<float>(field); };
else if (which.isFloat64())
parser = [](std::string & field) -> Field { return pqxx::from_string<double>(field); };
else if (which.isString() || which.isFixedString())
parser = [](std::string & field) -> Field { return field; };
else if (which.isDate())
parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; };
else if (which.isDateTime())
parser = [](std::string & field) -> Field { return time_t{LocalDateTime{field}}; };
else if (which.isDecimal32())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal32> *>(nested.get());
DataTypeDecimal<Decimal32> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal64())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal64> *>(nested.get());
DataTypeDecimal<Decimal64> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal128())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal128> *>(nested.get());
DataTypeDecimal<Decimal128> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else if (which.isDecimal256())
parser = [nested](std::string & field) -> Field
{
const auto & type = typeid_cast<const DataTypeDecimal<Decimal256> *>(nested.get());
DataTypeDecimal<Decimal256> res(getDecimalPrecision(*type), getDecimalScale(*type));
return convertFieldToType(field, res);
};
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type conversion to {} is not supported", nested->getName());
array_info[column_idx] = {count_dimensions, default_value, parser};
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
#include <Core/Field.h>
#include <pqxx/pqxx>
namespace DB
{
struct PostgreSQLArrayInfo
{
size_t num_dimensions;
Field default_value;
std::function<Field(std::string & field)> pqxx_parser;
};
void insertPostgreSQLValue(
IColumn & column, std::string_view value,
const ExternalResultDescription::ValueType type, const DataTypePtr data_type,
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t idx);
void preparePostgreSQLArrayInfo(
std::unordered_map<size_t, PostgreSQLArrayInfo> & array_info, size_t column_idx, const DataTypePtr data_type);
}

View File

@ -13,7 +13,7 @@ instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'
postgres_table_template = """
CREATE TABLE IF NOT EXISTS {} (
key Integer NOT NULL, value Integer, PRIMARY KEY (key))
key Integer NOT NULL, value Integer)
"""
def get_postgres_conn(database=False):
@ -108,6 +108,70 @@ def test_no_connection_at_startup(started_cluster):
cursor.execute('DROP TABLE postgresql_replica;')
postgresql_replica_check_result(result, True)
def test_detach_attach_is_ok(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(50)")
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = PostgreSQLReplica(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
result = instance.query('SELECT * FROM test.postgresql_replica;')
postgresql_replica_check_result(result, True)
instance.query('DETACH TABLE test.postgresql_replica')
instance.query('ATTACH TABLE test.postgresql_replica')
result = instance.query('SELECT * FROM test.postgresql_replica;')
cursor.execute('DROP TABLE postgresql_replica;')
postgresql_replica_check_result(result, True)
def test_replicating_inserts(started_cluster):
conn = get_postgres_conn(True)
cursor = conn.cursor()
create_postgres_table(cursor, 'postgresql_replica');
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT number, number from numbers(10)")
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = PostgreSQLReplica(
'postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key;
''')
result = instance.query('SELECT count() FROM test.postgresql_replica;')
assert(int(result) == 10)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 10 + number, 10 + number from numbers(10)")
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 20 + number, 20 + number from numbers(10)")
time.sleep(4)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
assert(int(result) == 30)
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 30 + number, 30 + number from numbers(10)")
instance.query("INSERT INTO postgres_database.postgresql_replica SELECT 40 + number, 40 + number from numbers(10)")
time.sleep(4)
result = instance.query('SELECT count() FROM test.postgresql_replica;')
assert(int(result) == 50)
result = instance.query('SELECT * FROM test.postgresql_replica ORDER BY key;')
cursor.execute('DROP TABLE postgresql_replica;')
postgresql_replica_check_result(result, True)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")