Make common class for external storages

This commit is contained in:
kssenii 2021-03-27 17:12:47 +00:00
parent 95e8a8b9f0
commit f141f027f4
5 changed files with 120 additions and 83 deletions

View File

@ -15,3 +15,10 @@ services:
MYSQL_ROOT_PASSWORD: clickhouse
ports:
- 3388:3306
mysql3:
image: mysql:5.7
restart: always
environment:
MYSQL_ROOT_PASSWORD: clickhouse
ports:
- 3368:3306

View File

@ -1,4 +1,4 @@
#include "StorageMySQLDistributed.h"
#include "StorageExternalDistributed.h"
#if USE_MYSQL
@ -20,6 +20,8 @@
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Common/parseRemoteDescription.h>
#include <Storages/StorageMySQL.h>
#include <common/logger_useful.h>
namespace DB
@ -31,107 +33,116 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
StorageMySQLDistributed::StorageMySQLDistributed(
StorageExternalDistributed::StorageExternalDistributed(
const StorageID & table_id_,
const String & remote_database_,
const String & remote_table_,
const String & engine_name,
const String & cluster_description,
const String & remote_database,
const String & remote_table,
const String & username,
const String & password,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_)
const Context & context)
: IStorage(table_id_)
, remote_database(remote_database_)
, remote_table(remote_table_)
, context(context_.getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
/// TODO: add proper setting
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
/// Split into shards
std::vector<String> shards_descriptions = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses);
/// For each shard make a connection pool of its replicas, replicas a parsed in pool
/// For each shard pass replicas description into storage, replicas are managed by storage's PoolWithFailover.
for (const auto & shard_description : shards_descriptions)
{
/// Parse shard description like host-{01..02}-{1|2|3}:port, into host_description (host-01-{1..2}-{1|2|3}) and port
auto parsed_host_port = parseAddress(shard_description, 3306);
shards.emplace(std::make_shared<mysqlxx::PoolWithFailover>(remote_database, parsed_host_port.first, parsed_host_port.second, username, password));
/// Parse shard description like host-{01..02}-{1|2|3}:port, into host_description_pattern (host-01-{1..2}-{1|2|3}) and port
/// TODO: add setting with default port
LOG_TRACE(&Poco::Logger::get("StorageExternalDistributed"), "Adding shard description: {}", shard_description);
auto parsed_shard_description = parseAddress(shard_description, 3306);
StoragePtr shard;
if (engine_name == "MySQL")
{
mysqlxx::PoolWithFailover pool(remote_database, parsed_shard_description.first, parsed_shard_description.second, username, password);
shard = StorageMySQL::create(
table_id_, std::move(pool), remote_database, remote_table,
/* replace_query = */ false, /* on_duplicate_clause = */ "",
columns_, constraints_, context);
}
else if (engine_name == "PostgreSQL")
{
}
else
{
throw Exception(
"External storage engine {} is not supported for StorageExternalDistributed. Supported engines are: MySQL, PostgreSQL",
ErrorCodes::BAD_ARGUMENTS);
}
shards.emplace(std::move(shard));
}
}
Pipe StorageMySQLDistributed::read(
const Names & column_names_,
Pipe StorageExternalDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info_,
const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size_,
unsigned)
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
String query = transformQueryForExternalDatabase(
query_info_,
metadata_snapshot->getColumns().getOrdinary(),
IdentifierQuotingStyle::BackticksMySQL,
remote_database,
remote_table,
context_);
Block sample_block;
for (const String & column_name : column_names_)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
WhichDataType which(column_data.type);
/// Convert enum to string.
if (which.isEnum())
column_data.type = std::make_shared<DataTypeString>();
sample_block.insert({ column_data.type, column_data.name });
}
Pipes pipes;
for (const auto & shard : shards)
{
pipes.emplace_back(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLWithFailoverBlockInputStream>(shard, query, sample_block, max_block_size_, /* auto_close = */ true)));
pipes.emplace_back(shard->read(
column_names,
metadata_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams
));
}
return Pipe::unitePipes(std::move(pipes));
}
void registerStorageMySQLDistributed(StorageFactory & factory)
void registerStorageExternalDistributed(StorageFactory & factory)
{
factory.registerStorage("MySQLDistributed", [](const StorageFactory::Arguments & args)
factory.registerStorage("ExternalDistributed", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 5)
if (engine_args.size() != 6)
throw Exception(
"Storage MySQLiDistributed requires 5 parameters: MySQLDistributed('cluster_description', database, table, 'user', 'password').",
"Storage MySQLiDistributed requires 5 parameters: ExternalDistributed('engine_name', 'cluster_description', database, table, 'user', 'password').",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.local_context);
const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & cluster_description = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & engine_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
const String & cluster_description = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_database = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const String & remote_table = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & username = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
return StorageMySQLDistributed::create(
return StorageExternalDistributed::create(
args.table_id,
engine_name,
cluster_description,
remote_database,
remote_table,
cluster_description,
username,
password,
args.columns,

View File

@ -14,12 +14,14 @@
namespace DB
{
class StorageMySQLDistributed final : public ext::shared_ptr_helper<StorageMySQLDistributed>, public DB::IStorage
/// Storages MySQL and PostgreSQL use ConnectionPoolWithFailover and support multiple replicas.
/// This is a class which unites multiple storages with replicas into multiple shards with replicas.
class StorageExternalDistributed final : public ext::shared_ptr_helper<StorageExternalDistributed>, public DB::IStorage
{
friend struct ext::shared_ptr_helper<StorageMySQLDistributed>;
friend struct ext::shared_ptr_helper<StorageExternalDistributed>;
public:
std::string getName() const override { return "MySQLDistributed"; }
std::string getName() const override { return "ExternalDistributed"; }
Pipe read(
const Names & column_names,
@ -31,11 +33,12 @@ public:
unsigned num_streams) override;
protected:
StorageMySQLDistributed(
StorageExternalDistributed(
const StorageID & table_id_,
const String & remote_database__,
const String & remote_table_,
const String & engine_name_,
const String & cluster_description,
const String & remote_database_,
const String & remote_table_,
const String & username,
const String & password,
const ColumnsDescription & columns_,
@ -43,13 +46,7 @@ protected:
const Context & context_);
private:
using Replicas = std::shared_ptr<mysqlxx::PoolWithFailover>;
using Shards = std::unordered_set<Replicas>;
const std::string remote_database;
const std::string remote_table;
const Context & context;
size_t shard_count;
using Shards = std::unordered_set<StoragePtr>;
Shards shards;
};

View File

@ -62,6 +62,10 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory);
void registerStoragePostgreSQL(StorageFactory & factory);
#endif
#if USE_MYSQL || USE_LIBPQXX
void registerStorageExternalDistributed(StorageFactory & factory);
#endif
void registerStorages()
{
auto & factory = StorageFactory::instance();
@ -118,6 +122,10 @@ void registerStorages()
#if USE_LIBPQXX
registerStoragePostgreSQL(factory);
#endif
#if USE_MYSQL || USE_LIBPQXX
registerStorageExternalDistributed(factory);
#endif
}
}

View File

@ -42,8 +42,6 @@ def started_cluster():
## create mysql db and table
conn1 = get_mysql_conn(port=3308)
create_mysql_db(conn1, 'clickhouse')
conn2 = get_mysql_conn(port=3388)
create_mysql_db(conn2, 'clickhouse')
yield cluster
finally:
@ -188,39 +186,55 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('
conn.close()
def test_mysql_many_replicas(started_cluster):
def test_mysql_distributed(started_cluster):
table_name = 'test_replicas'
conn1 = get_mysql_conn(port=3308)
create_mysql_table(conn1, table_name)
conn2 = get_mysql_conn(port=3388)
create_mysql_table(conn2, table_name)
# Storage with mysql{1|2}
conn2 = get_mysql_conn(port=3388)
create_mysql_db(conn2, 'clickhouse')
create_mysql_table(conn2, table_name)
conn3 = get_mysql_conn(port=3368)
create_mysql_db(conn3, 'clickhouse')
create_mysql_table(conn3, table_name)
# Storage with with two replicas mysql1:3306 and mysql2:3306
node1.query('''
CREATE TABLE test_replicas
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql{1|2}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
ENGINE = MySQL(`mysql{1|2|3}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
# Fill remote tables with different data to be able to check
node1.query('''
CREATE TABLE test_replica1
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql1:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');''')
node1.query('''
CREATE TABLE test_replica2
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql2:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
node1.query("INSERT INTO test_replica1 (id, name) SELECT number, 'host1' from numbers(10) ")
node1.query("INSERT INTO test_replica2 (id, name) SELECT number, 'host2' from numbers(10) ")
for i in range(1, 4):
node1.query('''
CREATE TABLE test_replica{}
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = MySQL(`mysql{}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');'''.format(i, i))
node1.query("INSERT INTO test_replica{} (id, name) SELECT number, 'host{}' from numbers(10) ".format(i, i))
# check both remote replicas are accessible throught that table
query = "SELECT * FROM ("
for i in range (2):
for i in range (3):
query += "SELECT name FROM test_replicas UNION DISTINCT "
query += "SELECT name FROM test_replicas)"
result = node1.query(query.format(t=table_name))
assert(result == 'host1\nhost2\n')
assert(result == 'host1\nhost2\nhost3\n')
# Storage with with two two shards, each has 2 replicas
node1.query('''
CREATE TABLE test_shards
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = ExternalDistributed('MySQL', `mysql{1|2}:3306,mysql{3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse'); ''')
# check both remote replicas are accessible throught that table
query = "SELECT name FROM ("
for i in range (2):
query += "SELECT name FROM test_shards UNION DISTINCT "
query += "SELECT name FROM test_shards) ORDER BY name"
result = node1.query(query.format(t=table_name))
assert(result == 'host1\nhost2\nhost3\n')
if __name__ == '__main__':