Better comments

This commit is contained in:
kssenii 2021-03-27 20:49:26 +00:00
parent 22b515fbc9
commit ef537b802f
8 changed files with 35 additions and 37 deletions

View File

@ -5,10 +5,16 @@
#include <mysqlxx/PoolWithFailover.h>
#include <Common/parseRemoteDescription.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
using namespace mysqlxx;
PoolWithFailover::PoolWithFailover(
@ -83,9 +89,9 @@ PoolWithFailover::PoolWithFailover(
auto hosts = DB::parseRemoteDescription(hosts_pattern, 0, hosts_pattern.size(), '|', max_addresses);
for (const auto & host : hosts)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue after each fetch.
/// Replicas have the same priority, but traversed replicas (with failed connection) are moved to the end of the queue.
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database, host, user, password, port));
LOG_TRACE(&Poco::Logger::get("MySQLPoolWithFailover"), "Adding address {}:{} to pool", host, port);
LOG_TRACE(&Poco::Logger::get("MySQLPoolWithFailover"), "Adding address {}:{} to MySQL pool", host, port);
}
}

View File

@ -71,13 +71,10 @@ namespace mysqlxx
using PoolPtr = std::shared_ptr<Pool>;
using Replicas = std::vector<PoolPtr>;
/// Mysql dictionary sourse related priotity:
/// [priority][index] -> replica. Highest priotity is 0.
using ReplicasByPriority = std::map<int, Replicas>;
ReplicasByPriority replicas_by_priority;
/// Mysql storage related priotity:
/// Number of connection tries.
size_t max_tries;
/// Mutex for set of replicas.

View File

@ -57,7 +57,7 @@ MySQLBlockInputStream::MySQLBlockInputStream(
initPositionMappingFromQueryResultStructure();
}
/// As a base class contructor for MySQLWithFailoverBlockInputStream.
/// For descendant MySQLWithFailoverBlockInputStream
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
@ -89,7 +89,7 @@ MySQLWithFailoverBlockInputStream::MySQLWithFailoverBlockInputStream(
void MySQLWithFailoverBlockInputStream::readPrefix()
{
size_t count_connection_lost = 0;
size_t count_connect_attempts = 0;
/// For recovering from "Lost connection to MySQL server during query" errors
while (true)
@ -101,14 +101,12 @@ void MySQLWithFailoverBlockInputStream::readPrefix()
}
catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
{
LOG_WARNING(log, ecl.displayText());
LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, max_tries, ecl.displayText());
}
LOG_WARNING(log, "Lost connection ({}/{}). Trying to reconnect...", count_connection_lost, max_tries);
if (++count_connection_lost > max_tries)
if (++count_connect_attempts > max_tries)
{
LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connection_lost, max_tries);
LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, max_tries);
throw;
}
}
@ -201,11 +199,8 @@ Block MySQLBlockInputStream::readImpl()
if (!row)
{
if (auto_close)
{
LOG_TRACE(log, "Removing connection, kssenii");
connection->entry.disconnect();
}
LOG_TRACE(log, "Connection disconnected kssenii");
return {};
}

View File

@ -51,9 +51,9 @@ protected:
ExternalResultDescription description;
};
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting
/// and retries in case of connection errors. It allows to create a lot of stream objects
/// without occupation of all connection pool.
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
/// It allows to create a lot of stream objects without occupation of all connection pool.
/// Also mekes attempts to reconnect in case of connection failures.
class MySQLWithFailoverBlockInputStream final : public MySQLBlockInputStream
{
public:

View File

@ -40,7 +40,7 @@ void Connection::connectIfNeeded()
if (!connection || !connection->is_open())
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLConnection"), "New connection to {}", getAddress());
connection = std::make_unique<pqxx::connection>(connection_str);
connection = std::make_shared<pqxx::connection>(connection_str);
}
}

View File

@ -1,23 +1,16 @@
#include "StorageExternalDistributed.h"
#if USE_MYSQL
#if USE_MYSQL || USE_LIBPQXX
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Common/parseAddress.h>
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTLiteral.h>
#include <Common/parseAddress.h>
#include <mysqlxx/Transaction.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageMySQL.h>

View File

@ -4,18 +4,23 @@
#include "config_core.h"
#endif
#if USE_MYSQL
#if USE_MYSQL || USE_LIBPQXX
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <mysqlxx/PoolWithFailover.h>
namespace DB
{
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
/// This is a class which unites multiple storages with replicas into multiple shards with replicas.
/// This class unites multiple storages with replicas into multiple shards with replicas.
/// A query to external database is passed to one replica on each shard, the result is united.
/// Replicas on each shard have the same priority, unavailable replicas are moved to the end of
/// the queue. The queue is shuffled from time to time.
/// TODO: try `load_balancing` setting for replicas priorities same way as for table function `remote`
class StorageExternalDistributed final : public ext::shared_ptr_helper<StorageExternalDistributed>, public DB::IStorage
{
friend struct ext::shared_ptr_helper<StorageExternalDistributed>;

View File

@ -188,17 +188,19 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('
def test_mysql_distributed(started_cluster):
table_name = 'test_replicas'
conn1 = get_mysql_conn(port=3308)
create_mysql_table(conn1, table_name)
conn1 = get_mysql_conn(port=3308)
conn2 = get_mysql_conn(port=3388)
create_mysql_db(conn2, 'clickhouse')
create_mysql_table(conn2, table_name)
conn3 = get_mysql_conn(port=3368)
create_mysql_db(conn2, 'clickhouse')
create_mysql_db(conn3, 'clickhouse')
create_mysql_table(conn1, table_name)
create_mysql_table(conn2, table_name)
create_mysql_table(conn3, table_name)
# Storage with with two replicas mysql1:3306 and mysql2:3306
# Storage with with 3 replicas
node1.query('''
CREATE TABLE test_replicas
(id UInt32, name String, age UInt32, money UInt32)
@ -212,7 +214,7 @@ def test_mysql_distributed(started_cluster):
ENGINE = MySQL(`mysql{}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');'''.format(i, i))
node1.query("INSERT INTO test_replica{} (id, name) SELECT number, 'host{}' from numbers(10) ".format(i, i))
# check both remote replicas are accessible throught that table
# check all remote replicas are accessible throught that table
query = "SELECT * FROM ("
for i in range (3):
query += "SELECT name FROM test_replicas UNION DISTINCT "