mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Merge pull request #9409 from clemrodriguez/mysqlxx-share-connection
Mysqlxx share connection for external dictionaries only
This commit is contained in:
commit
3645375f71
@ -8,6 +8,7 @@ add_library (mysqlxx
|
||||
src/Row.cpp
|
||||
src/Value.cpp
|
||||
src/Pool.cpp
|
||||
src/PoolFactory.cpp
|
||||
src/PoolWithFailover.cpp
|
||||
|
||||
include/mysqlxx/Connection.h
|
||||
@ -15,6 +16,7 @@ add_library (mysqlxx
|
||||
include/mysqlxx/mysqlxx.h
|
||||
include/mysqlxx/Null.h
|
||||
include/mysqlxx/Pool.h
|
||||
include/mysqlxx/PoolFactory.h
|
||||
include/mysqlxx/PoolWithFailover.h
|
||||
include/mysqlxx/Query.h
|
||||
include/mysqlxx/ResultBase.h
|
||||
|
@ -198,6 +198,8 @@ public:
|
||||
return description;
|
||||
}
|
||||
|
||||
void removeConnection(Connection* data);
|
||||
|
||||
protected:
|
||||
/// Number of MySQL connections which are created at launch.
|
||||
unsigned default_connections;
|
||||
|
55
base/mysqlxx/include/mysqlxx/PoolFactory.h
Normal file
55
base/mysqlxx/include/mysqlxx/PoolFactory.h
Normal file
@ -0,0 +1,55 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <mysqlxx/PoolWithFailover.h>
|
||||
|
||||
|
||||
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
|
||||
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
|
||||
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
|
||||
|
||||
|
||||
namespace mysqlxx
|
||||
{
|
||||
|
||||
/*
|
||||
* PoolFactory.h
|
||||
* This class is a helper singleton to mutualize connections to MySQL.
|
||||
*/
|
||||
class PoolFactory final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
static PoolFactory & instance();
|
||||
|
||||
PoolFactory(const PoolFactory &) = delete;
|
||||
|
||||
/** Allocates a PoolWithFailover to connect to MySQL. */
|
||||
PoolWithFailover Get(const std::string & config_name,
|
||||
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
|
||||
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
|
||||
|
||||
/** Allocates a PoolWithFailover to connect to MySQL. */
|
||||
PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name,
|
||||
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
|
||||
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
|
||||
|
||||
void reset();
|
||||
|
||||
|
||||
~PoolFactory() = default;
|
||||
PoolFactory& operator=(const PoolFactory &) = delete;
|
||||
|
||||
private:
|
||||
PoolFactory();
|
||||
|
||||
struct Impl;
|
||||
std::unique_ptr<Impl> impl;
|
||||
};
|
||||
|
||||
}
|
@ -77,6 +77,10 @@ namespace mysqlxx
|
||||
size_t max_tries;
|
||||
/// Mutex for set of replicas.
|
||||
std::mutex mutex;
|
||||
std::string config_name;
|
||||
|
||||
/// Can the Pool be shared
|
||||
bool shareable;
|
||||
|
||||
public:
|
||||
using Entry = Pool::Entry;
|
||||
@ -100,8 +104,6 @@ namespace mysqlxx
|
||||
|
||||
PoolWithFailover(const PoolWithFailover & other);
|
||||
|
||||
PoolWithFailover & operator=(const PoolWithFailover &) = delete;
|
||||
|
||||
/** Allocates a connection to use. */
|
||||
Entry Get();
|
||||
};
|
||||
|
@ -239,7 +239,8 @@ template <> inline bool Value::get<bool >() cons
|
||||
template <> inline char Value::get<char >() const { return getInt(); }
|
||||
template <> inline signed char Value::get<signed char >() const { return getInt(); }
|
||||
template <> inline unsigned char Value::get<unsigned char >() const { return getUInt(); }
|
||||
template <> inline char8_t Value::get<char8_t >() const { return getUInt(); }
|
||||
// crodriguez uncomment
|
||||
//template <> inline char8_t Value::get<char8_t >() const { return getUInt(); }
|
||||
template <> inline short Value::get<short >() const { return getInt(); }
|
||||
template <> inline unsigned short Value::get<unsigned short >() const { return getUInt(); }
|
||||
template <> inline int Value::get<int >() const { return getInt(); }
|
||||
|
@ -21,16 +21,21 @@ void Pool::Entry::incrementRefCount()
|
||||
{
|
||||
if (!data)
|
||||
return;
|
||||
++data->ref_count;
|
||||
mysql_thread_init();
|
||||
++(data->ref_count);
|
||||
if (data->ref_count==1)
|
||||
mysql_thread_init();
|
||||
}
|
||||
|
||||
void Pool::Entry::decrementRefCount()
|
||||
{
|
||||
if (!data)
|
||||
return;
|
||||
--data->ref_count;
|
||||
mysql_thread_end();
|
||||
if (data->ref_count > 0)
|
||||
{
|
||||
--(data->ref_count);
|
||||
if (data->ref_count==0)
|
||||
mysql_thread_end();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -169,14 +174,24 @@ Pool::Entry Pool::tryGet()
|
||||
return Entry();
|
||||
}
|
||||
|
||||
void Pool::removeConnection(Connection* connection)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
if (connection)
|
||||
{
|
||||
if (connection->ref_count > 0)
|
||||
{
|
||||
connection->conn.disconnect();
|
||||
connection->ref_count = 0;
|
||||
}
|
||||
connections.remove(connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Pool::Entry::disconnect()
|
||||
{
|
||||
if (data)
|
||||
{
|
||||
decrementRefCount();
|
||||
data->conn.disconnect();
|
||||
}
|
||||
pool->removeConnection(data);
|
||||
}
|
||||
|
||||
|
||||
|
122
base/mysqlxx/src/PoolFactory.cpp
Normal file
122
base/mysqlxx/src/PoolFactory.cpp
Normal file
@ -0,0 +1,122 @@
|
||||
#include <mysqlxx/PoolFactory.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
|
||||
namespace mysqlxx
|
||||
{
|
||||
|
||||
struct PoolFactory::Impl
|
||||
{
|
||||
// Cache of already affected pools identified by their config name
|
||||
std::map<std::string, std::shared_ptr<PoolWithFailover>> pools;
|
||||
|
||||
// Cache of Pool ID (host + port + user +...) cibling already established shareable pool
|
||||
std::map<std::string, std::string> pools_by_ids;
|
||||
|
||||
/// Protect pools and pools_by_ids caches
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections,
|
||||
unsigned max_connections, size_t max_tries)
|
||||
{
|
||||
return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
|
||||
}
|
||||
|
||||
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
|
||||
static bool startsWith(const std::string & s, const char * prefix)
|
||||
{
|
||||
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
|
||||
}
|
||||
|
||||
static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name)
|
||||
{
|
||||
bool shared = config.getBool(config_name + ".share_connection", false);
|
||||
|
||||
// Not shared no need to generate a name the pool won't be stored
|
||||
if (!shared)
|
||||
return "";
|
||||
|
||||
std::string entry_name = "";
|
||||
std::string host = config.getString(config_name + ".host", "");
|
||||
std::string port = config.getString(config_name + ".port", "");
|
||||
std::string user = config.getString(config_name + ".user", "");
|
||||
std::string db = config.getString(config_name + ".db", "");
|
||||
std::string table = config.getString(config_name + ".table", "");
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
|
||||
if (config.has(config_name + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
config.keys(config_name, replica_keys);
|
||||
for (const auto & replica_config_key : replica_keys)
|
||||
{
|
||||
/// There could be another elements in the same level in configuration file, like "user", "port"...
|
||||
if (startsWith(replica_config_key, "replica"))
|
||||
{
|
||||
std::string replica_name = config_name + "." + replica_config_key;
|
||||
std::string tmp_host = config.getString(replica_name + ".host", host);
|
||||
std::string tmp_port = config.getString(replica_name + ".port", port);
|
||||
std::string tmp_user = config.getString(replica_name + ".user", user);
|
||||
entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
entry_name = user + "@" + host + ":" + port + "/" + db;
|
||||
}
|
||||
return entry_name;
|
||||
}
|
||||
|
||||
PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries)
|
||||
{
|
||||
|
||||
std::lock_guard<std::mutex> lock(impl->mutex);
|
||||
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
|
||||
{
|
||||
return *(entry->second.get());
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string entry_name = getPoolEntryName(config, config_name);
|
||||
if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end())
|
||||
{
|
||||
entry = impl->pools.find(id->second);
|
||||
std::shared_ptr<PoolWithFailover> pool = entry->second;
|
||||
impl->pools.insert_or_assign(config_name, pool);
|
||||
return *pool;
|
||||
}
|
||||
|
||||
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
|
||||
// Check the pool will be shared
|
||||
if (!entry_name.empty())
|
||||
{
|
||||
// Store shared pool
|
||||
impl->pools.insert_or_assign(config_name, pool);
|
||||
impl->pools_by_ids.insert_or_assign(entry_name, config_name);
|
||||
}
|
||||
return *(pool.get());
|
||||
}
|
||||
}
|
||||
|
||||
void PoolFactory::reset()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(impl->mutex);
|
||||
impl->pools.clear();
|
||||
impl->pools_by_ids.clear();
|
||||
}
|
||||
|
||||
PoolFactory::PoolFactory() : impl(std::make_unique<PoolFactory::Impl>()) {}
|
||||
|
||||
PoolFactory & PoolFactory::instance()
|
||||
{
|
||||
static PoolFactory ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
@ -15,6 +15,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg
|
||||
const unsigned max_connections, const size_t max_tries)
|
||||
: max_tries(max_tries)
|
||||
{
|
||||
shareable = cfg.getBool(config_name + ".share_connection", false);
|
||||
if (cfg.has(config_name + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
@ -48,15 +49,22 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign
|
||||
{}
|
||||
|
||||
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
|
||||
: max_tries{other.max_tries}
|
||||
: max_tries{other.max_tries}, config_name{other.config_name}, shareable{other.shareable}
|
||||
{
|
||||
for (const auto & priority_replicas : other.replicas_by_priority)
|
||||
if (shareable)
|
||||
{
|
||||
Replicas replicas;
|
||||
replicas.reserve(priority_replicas.second.size());
|
||||
for (const auto & pool : priority_replicas.second)
|
||||
replicas.emplace_back(std::make_shared<Pool>(*pool));
|
||||
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
|
||||
replicas_by_priority = other.replicas_by_priority;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & priority_replicas : other.replicas_by_priority)
|
||||
{
|
||||
Replicas replicas;
|
||||
replicas.reserve(priority_replicas.second.size());
|
||||
for (const auto & pool : priority_replicas.second)
|
||||
replicas.emplace_back(std::make_shared<Pool>(*pool));
|
||||
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,7 +89,7 @@ PoolWithFailover::Entry PoolWithFailover::Get()
|
||||
|
||||
try
|
||||
{
|
||||
Entry entry = pool->tryGet();
|
||||
Entry entry = shareable ? pool->Get() : pool->tryGet();
|
||||
|
||||
if (!entry.isNull())
|
||||
{
|
||||
|
@ -46,6 +46,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
|
||||
# include <common/logger_useful.h>
|
||||
# include <Formats/MySQLBlockInputStream.h>
|
||||
# include "readInvalidateQuery.h"
|
||||
# include <mysqlxx/PoolFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -66,11 +67,11 @@ MySQLDictionarySource::MySQLDictionarySource(
|
||||
, update_field{config.getString(config_prefix + ".update_field", "")}
|
||||
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
|
||||
, sample_block{sample_block_}
|
||||
, pool{config, config_prefix}
|
||||
, pool{mysqlxx::PoolFactory::instance().Get(config, config_prefix)}
|
||||
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
|
||||
, load_all_query{query_builder.composeLoadAllQuery()}
|
||||
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
|
||||
, close_connection{config.getBool(config_prefix + ".close_connection", false)}
|
||||
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
|
||||
{
|
||||
}
|
||||
|
||||
@ -114,19 +115,21 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
|
||||
|
||||
BlockInputStreamPtr MySQLDictionarySource::loadAll()
|
||||
{
|
||||
last_modification = getLastModification();
|
||||
auto connection = pool.Get();
|
||||
last_modification = getLastModification(connection, false);
|
||||
|
||||
LOG_TRACE(log, load_all_query);
|
||||
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size, close_connection);
|
||||
return std::make_shared<MySQLBlockInputStream>(connection, load_all_query, sample_block, max_block_size, close_connection);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
|
||||
{
|
||||
last_modification = getLastModification();
|
||||
auto connection = pool.Get();
|
||||
last_modification = getLastModification(connection, false);
|
||||
|
||||
std::string load_update_query = getUpdateFieldAndDate();
|
||||
LOG_TRACE(log, load_update_query);
|
||||
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size, close_connection);
|
||||
return std::make_shared<MySQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size, close_connection);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
||||
@ -158,8 +161,8 @@ bool MySQLDictionarySource::isModified() const
|
||||
|
||||
if (dont_check_update_time)
|
||||
return true;
|
||||
|
||||
return getLastModification() > last_modification;
|
||||
auto connection = pool.Get();
|
||||
return getLastModification(connection, true) > last_modification;
|
||||
}
|
||||
|
||||
bool MySQLDictionarySource::supportsSelectiveLoad() const
|
||||
@ -199,7 +202,7 @@ std::string MySQLDictionarySource::quoteForLike(const std::string s)
|
||||
return out.str();
|
||||
}
|
||||
|
||||
LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
LocalDateTime MySQLDictionarySource::getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const
|
||||
{
|
||||
LocalDateTime modification_time{std::time(nullptr)};
|
||||
|
||||
@ -208,7 +211,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
|
||||
try
|
||||
{
|
||||
auto connection = pool.Get();
|
||||
auto query = connection->query("SHOW TABLE STATUS LIKE " + quoteForLike(table));
|
||||
|
||||
LOG_TRACE(log, query.str());
|
||||
@ -233,6 +235,11 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
++fetched_rows;
|
||||
}
|
||||
|
||||
if (close_connection && allow_connection_closure)
|
||||
{
|
||||
connection.disconnect();
|
||||
}
|
||||
|
||||
if (0 == fetched_rows)
|
||||
LOG_ERROR(log, "Cannot find table in SHOW TABLE STATUS result.");
|
||||
|
||||
@ -243,7 +250,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
{
|
||||
tryLogCurrentException("MySQLDictionarySource");
|
||||
}
|
||||
|
||||
/// we suppose failure to get modification time is not an error, therefore return current time
|
||||
return modification_time;
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ private:
|
||||
|
||||
static std::string quoteForLike(const std::string s);
|
||||
|
||||
LocalDateTime getLastModification() const;
|
||||
LocalDateTime getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const;
|
||||
|
||||
// execute invalidate_query. expects single cell in result
|
||||
std::string doInvalidateQuery(const std::string & request) const;
|
||||
|
@ -1,5 +1,10 @@
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Dictionaries/DictionaryFactory.h>
|
||||
#include "config_core.h"
|
||||
|
||||
#if USE_MYSQL
|
||||
# include <mysqlxx/PoolFactory.h>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -24,4 +29,12 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
|
||||
bool dictionary_from_database = !repository_name.empty();
|
||||
return DictionaryFactory::instance().create(name, config, key_in_config, context, dictionary_from_database);
|
||||
}
|
||||
|
||||
void ExternalDictionariesLoader::resetAll()
|
||||
{
|
||||
#if USE_MYSQL
|
||||
mysqlxx::PoolFactory::instance().reset();
|
||||
#endif
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <Interpreters/ExternalLoader.h>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class Context;
|
||||
@ -29,6 +28,8 @@ public:
|
||||
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(name));
|
||||
}
|
||||
|
||||
static void resetAll();
|
||||
|
||||
protected:
|
||||
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & key_in_config, const std::string & repository_name) const override;
|
||||
|
@ -216,6 +216,7 @@ BlockIO InterpreterSystemQuery::execute()
|
||||
case Type::RELOAD_DICTIONARY:
|
||||
context.checkAccess(AccessType::RELOAD_DICTIONARY);
|
||||
system_context.getExternalDictionariesLoader().loadOrReload(query.target_dictionary);
|
||||
ExternalDictionariesLoader::resetAll();
|
||||
break;
|
||||
case Type::RELOAD_DICTIONARIES:
|
||||
context.checkAccess(AccessType::RELOAD_DICTIONARY);
|
||||
@ -223,6 +224,7 @@ BlockIO InterpreterSystemQuery::execute()
|
||||
[&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); },
|
||||
[&] () { system_context.getEmbeddedDictionaries().reload(); }
|
||||
);
|
||||
ExternalDictionariesLoader::resetAll();
|
||||
break;
|
||||
case Type::RELOAD_EMBEDDED_DICTIONARIES:
|
||||
context.checkAccess(AccessType::RELOAD_DICTIONARY);
|
||||
|
@ -0,0 +1,30 @@
|
||||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
</logger>
|
||||
|
||||
<tcp_port>9000</tcp_port>
|
||||
<listen_host>127.0.0.1</listen_host>
|
||||
|
||||
<openSSL>
|
||||
<client>
|
||||
<cacheSessions>true</cacheSessions>
|
||||
<verificationMode>none</verificationMode>
|
||||
<invalidCertificateHandler>
|
||||
<name>AcceptCertificateHandler</name>
|
||||
</invalidCertificateHandler>
|
||||
</client>
|
||||
</openSSL>
|
||||
|
||||
<max_concurrent_queries>500</max_concurrent_queries>
|
||||
<mark_cache_size>5368709120</mark_cache_size>
|
||||
<path>./clickhouse/</path>
|
||||
<users_config>users.xml</users_config>
|
||||
|
||||
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
|
||||
</yandex>
|
@ -0,0 +1,75 @@
|
||||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<dictionary>
|
||||
<name>dict0</name>
|
||||
<source>
|
||||
<mysql >
|
||||
<db>test</db>
|
||||
<host>mysql1</host>
|
||||
<port>3306</port>
|
||||
<user>root</user>
|
||||
<password>clickhouse</password>
|
||||
<table>test0</table>
|
||||
<close_connection>true</close_connection>
|
||||
<share_connection>true</share_connection>
|
||||
</mysql>
|
||||
</source>
|
||||
<layout>
|
||||
<hashed/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
<type>UInt32</type>
|
||||
<expression>CAST(id AS UNSIGNED)</expression>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>id</name>
|
||||
<type>Int32</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>value</name>
|
||||
<type>String</type>
|
||||
<null_value>(UNDEFINED)</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
<lifetime>0</lifetime>
|
||||
</dictionary>
|
||||
<dictionary>
|
||||
<name>dict1</name>
|
||||
<source>
|
||||
<mysql >
|
||||
<db>test</db>
|
||||
<host>mysql1</host>
|
||||
<port>3306</port>
|
||||
<user>root</user>
|
||||
<password>clickhouse</password>
|
||||
<table>test1</table>
|
||||
<close_connection>true</close_connection>
|
||||
<share_connection>true</share_connection>
|
||||
</mysql>
|
||||
</source>
|
||||
<layout>
|
||||
<hashed/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
<type>UInt32</type>
|
||||
<expression>CAST(id AS UNSIGNED)</expression>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>id</name>
|
||||
<type>Int32</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>value</name>
|
||||
<type>String</type>
|
||||
<null_value>(UNDEFINED)</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
<lifetime>0</lifetime>
|
||||
</dictionary>
|
||||
</yandex>
|
@ -0,0 +1,113 @@
|
||||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<dictionary>
|
||||
<name>dict2</name>
|
||||
<source>
|
||||
<mysql >
|
||||
<db>test</db>
|
||||
<host>mysql1</host>
|
||||
<port>3306</port>
|
||||
<user>root</user>
|
||||
<password>clickhouse</password>
|
||||
<table>test2</table>
|
||||
<close_connection>true</close_connection>
|
||||
<share_connection>true</share_connection>
|
||||
</mysql>
|
||||
</source>
|
||||
<layout>
|
||||
<hashed/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
<type>UInt32</type>
|
||||
<expression>CAST(id AS UNSIGNED)</expression>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>id</name>
|
||||
<type>Int32</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>value</name>
|
||||
<type>String</type>
|
||||
<null_value>(UNDEFINED)</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
<lifetime>0</lifetime>
|
||||
</dictionary>
|
||||
|
||||
<dictionary>
|
||||
<name>dict3</name>
|
||||
<source>
|
||||
<mysql >
|
||||
<db>test</db>
|
||||
<host>mysql1</host>
|
||||
<port>3306</port>
|
||||
<user>root</user>
|
||||
<password>clickhouse</password>
|
||||
<table>test3</table>
|
||||
<close_connection>true</close_connection>
|
||||
<share_connection>true</share_connection>
|
||||
</mysql>
|
||||
</source>
|
||||
<layout>
|
||||
<hashed/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
<type>UInt32</type>
|
||||
<expression>CAST(id AS UNSIGNED)</expression>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>id</name>
|
||||
<type>Int32</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>value</name>
|
||||
<type>String</type>
|
||||
<null_value>(UNDEFINED)</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
<lifetime>0</lifetime>
|
||||
</dictionary>
|
||||
<dictionary>
|
||||
<name>dict4</name>
|
||||
<source>
|
||||
<mysql >
|
||||
<db>test</db>
|
||||
<host>mysql1</host>
|
||||
<port>3306</port>
|
||||
<user>root</user>
|
||||
<password>clickhouse</password>
|
||||
<table>test4</table>
|
||||
<close_connection>true</close_connection>
|
||||
<share_connection>true</share_connection>
|
||||
</mysql>
|
||||
</source>
|
||||
<layout>
|
||||
<hashed/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
<type>UInt32</type>
|
||||
<expression>CAST(id AS UNSIGNED)</expression>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>id</name>
|
||||
<type>Int32</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>value</name>
|
||||
<type>String</type>
|
||||
<null_value>(UNDEFINED)</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
<lifetime>0</lifetime>
|
||||
</dictionary>
|
||||
|
||||
</yandex>
|
@ -0,0 +1,12 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>instance</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
@ -0,0 +1,23 @@
|
||||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default>
|
||||
</default>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<networks incl="networks" replace="replace">
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<quotas>
|
||||
<default>
|
||||
</default>
|
||||
</quotas>
|
||||
</yandex>
|
95
dbms/tests/integration/test_dictionaries_mysql/test.py
Normal file
95
dbms/tests/integration/test_dictionaries_mysql/test.py
Normal file
@ -0,0 +1,95 @@
|
||||
import pytest
|
||||
import os
|
||||
import time
|
||||
|
||||
## sudo -H pip install PyMySQL
|
||||
import pymysql.cursors
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
CONFIG_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml', 'configs/remote_servers.xml']
|
||||
|
||||
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
|
||||
instance = cluster.add_instance('instance', main_configs=CONFIG_FILES, with_mysql = True)
|
||||
|
||||
create_table_mysql_template = """
|
||||
CREATE TABLE IF NOT EXISTS `test`.`{}` (
|
||||
`id` int(11) NOT NULL,
|
||||
`value` varchar(50) NOT NULL,
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB;
|
||||
"""
|
||||
|
||||
create_clickhouse_dictionary_table_template = """
|
||||
CREATE TABLE IF NOT EXISTS `test`.`dict_table_{}` (`id` Int32, `value` String) ENGINE = Dictionary({})
|
||||
"""
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
#time.sleep(30)
|
||||
cluster.start()
|
||||
|
||||
# Create a MySQL database
|
||||
mysql_connection = get_mysql_conn()
|
||||
create_mysql_db(mysql_connection, 'test')
|
||||
mysql_connection.close()
|
||||
|
||||
# Create database in ClickHouse
|
||||
instance.query("CREATE DATABASE IF NOT EXISTS test")
|
||||
|
||||
# Create database in ClickChouse using MySQL protocol (will be used for data insertion)
|
||||
instance.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql1:3306', 'test', 'root', 'clickhouse')")
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_load_mysql_dictionaries(started_cluster):
|
||||
# Load dictionaries
|
||||
query = instance.query
|
||||
query("SYSTEM RELOAD DICTIONARIES")
|
||||
|
||||
for n in range(0, 5):
|
||||
# Create MySQL tables, fill them and create CH dict tables
|
||||
prepare_mysql_table('test', str(n))
|
||||
|
||||
# Check dictionaries are loaded and have correct number of elements
|
||||
for n in range(0, 100):
|
||||
# Force reload of dictionaries (each 10 iteration)
|
||||
if (n % 10) == 0:
|
||||
query("SYSTEM RELOAD DICTIONARIES")
|
||||
|
||||
# Check number of row
|
||||
assert query("SELECT count() FROM `test`.`dict_table_{}`".format('test' + str(n % 5))).rstrip() == '10000'
|
||||
|
||||
def create_mysql_db(mysql_connection, name):
|
||||
with mysql_connection.cursor() as cursor:
|
||||
cursor.execute("CREATE DATABASE IF NOT EXISTS {} DEFAULT CHARACTER SET 'utf8'".format(name))
|
||||
|
||||
def prepare_mysql_table(table_name, index):
|
||||
mysql_connection = get_mysql_conn()
|
||||
|
||||
# Create table
|
||||
create_mysql_table(mysql_connection, table_name + str(index))
|
||||
|
||||
# Insert rows using CH
|
||||
query = instance.query
|
||||
query("INSERT INTO `clickhouse_mysql`.{}(id, value) select number, concat('{} value ', toString(number)) from numbers(10000) ".format(table_name + str(index), table_name + str(index)))
|
||||
assert query("SELECT count() FROM `clickhouse_mysql`.{}".format(table_name + str(index))).rstrip() == '10000'
|
||||
mysql_connection.close()
|
||||
|
||||
#Create CH Dictionary tables based on MySQL tables
|
||||
query(create_clickhouse_dictionary_table_template.format(table_name + str(index), 'dict' + str(index)))
|
||||
|
||||
def get_mysql_conn():
|
||||
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.10', port=3308)
|
||||
return conn
|
||||
|
||||
def create_mysql_table(conn, table_name):
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(create_table_mysql_template.format(table_name))
|
Loading…
Reference in New Issue
Block a user