Do not expose MySQL headers in headers of mysqlxx library [#CLICKHOUSE-2716].

This commit is contained in:
Alexey Milovidov 2017-01-13 23:37:37 +03:00
parent 61d3936b23
commit fdc2464c78
16 changed files with 443 additions and 365 deletions

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <boost/noncopyable.hpp>
#include <Poco/Util/Application.h>
@ -26,16 +27,8 @@ class LibrarySingleton : public Singleton<LibrarySingleton>
{
friend class Singleton<LibrarySingleton>;
private:
LibrarySingleton()
{
if (mysql_library_init(0, nullptr, nullptr))
throw Exception("Cannot initialize MySQL library.");
}
~LibrarySingleton()
{
mysql_library_end();
}
LibrarySingleton();
~LibrarySingleton();
};
@ -70,11 +63,7 @@ public:
/** Конструктор-помошник. Создать соединение, считав все параметры из секции config_name конфигурации.
* Можно использовать, если вы используете Poco::Util::Application из библиотеки Poco.
*/
Connection(const std::string & config_name)
{
is_connected = false;
connect(config_name);
}
Connection(const std::string & config_name);
virtual ~Connection();
@ -126,7 +115,7 @@ public:
MYSQL * getDriver();
private:
MYSQL driver;
std::unique_ptr<MYSQL> driver;
bool is_connected;
};

View File

@ -1,8 +1,6 @@
#pragma once
#include <sstream>
#include <mysql.h>
#include <Poco/Exception.h>
@ -48,28 +46,10 @@ struct CannotParseValue : public Exception
};
inline std::string errorMessage(MYSQL * driver)
{
std::stringstream res;
res << mysql_error(driver) << " (" << driver->host << ":" << driver->port << ")";
return res.str();
}
std::string errorMessage(MYSQL * driver);
/// Для внутренних нужд библиотеки.
inline void checkError(MYSQL * driver)
{
unsigned num = mysql_errno(driver);
if (num)
throw Exception(errorMessage(driver), num);
}
/// Для внутренних нужд библиотеки.
inline void onError(MYSQL * driver)
{
throw Exception(errorMessage(driver), mysql_errno(driver));
}
/// For internal need of library.
void checkError(MYSQL * driver);
void onError(MYSQL * driver);
}

View File

@ -3,8 +3,6 @@
#include <list>
#include <memory>
#include <mysqld_error.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/NumberFormatter.h>
@ -42,10 +40,8 @@ protected:
/** Информация о соединении. */
struct Connection
{
Connection() : ref_count(0) {}
mysqlxx::Connection conn;
int ref_count;
int ref_count = 0;
};
public:
@ -84,32 +80,24 @@ public:
operator mysqlxx::Connection & ()
{
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return data->conn;
}
operator const mysqlxx::Connection & () const
{
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return data->conn;
}
const mysqlxx::Connection * operator->() const
{
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return &data->conn;
}
mysqlxx::Connection * operator->()
{
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
forceConnected();
return &data->conn;
}
@ -127,6 +115,7 @@ public:
else
return "pool is null";
}
friend class Pool;
private:
@ -138,6 +127,9 @@ public:
/** Переподключается к базе данных в случае необходимости. Если не удалось - подождать и попробовать снова. */
void forceConnected() const
{
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
Poco::Util::Application & app = Poco::Util::Application::instance();
if (data->conn.ping())
@ -171,21 +163,8 @@ public:
}
void incrementRefCount()
{
if (!data)
return;
++data->ref_count;
mysql_thread_init();
}
void decrementRefCount()
{
if (!data)
return;
--data->ref_count;
mysql_thread_end();
}
void incrementRefCount();
void decrementRefCount();
};
@ -274,73 +253,16 @@ public:
Pool & operator=(const Pool &) = delete;
~Pool()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
for (Connections::iterator it = connections.begin(); it != connections.end(); it++)
delete static_cast<Connection *>(*it);
}
~Pool();
/** Выделяет соединение для работы. */
Entry Get()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
initialize();
for (;;)
{
for (Connections::iterator it = connections.begin(); it != connections.end(); it++)
{
if ((*it)->ref_count == 0)
return Entry(*it, this);
}
if (connections.size() < static_cast<size_t>(max_connections))
{
Connection * conn = allocConnection();
if (conn)
return Entry(conn, this);
}
lock.unlock();
::sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
}
Entry Get();
/** Выделяет соединение для работы.
* Если база недоступна - возвращает пустой объект Entry.
* Если пул переполнен - кидает исключение.
*/
Entry tryGet()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
initialize();
/// Поиск уже установленного, но не использующегося сейчас соединения.
for (Connections::iterator it = connections.begin(); it != connections.end(); ++it)
{
if ((*it)->ref_count == 0)
{
Entry res(*it, this);
return res.tryForceConnected() ? res : Entry();
}
}
/// Если пул переполнен.
if (connections.size() >= max_connections)
throw Poco::Exception("mysqlxx::Pool is full");
/// Выделение нового соединения.
Connection * conn = allocConnection(true);
if (conn)
return Entry(conn, this);
return Entry();
}
Entry tryGet();
/// Получить описание БД
std::string getDescription() const
@ -379,61 +301,10 @@ private:
bool was_successful{false};
/** Выполняет инициализацию класса, если мы еще не инициализированы. */
void initialize()
{
if (!initialized)
{
description = db + "@" + server + ":" + Poco::NumberFormatter::format(port) + " as user " + user;
void initialize();
for (unsigned i = 0; i < default_connections; i++)
allocConnection();
initialized = true;
}
}
/** Создает новое соединение. */
Connection * allocConnection(bool dont_throw_if_failed_first_time = false)
{
Poco::Util::Application & app = Poco::Util::Application::instance();
std::unique_ptr<Connection> conn(new Connection);
try
{
app.logger().information("MYSQL: Connecting to " + description);
conn->conn.connect(
db.c_str(),
server.c_str(),
user.c_str(),
password.c_str(),
port,
connect_timeout,
rw_timeout);
}
catch (mysqlxx::ConnectionFailed & e)
{
if ((!was_successful && !dont_throw_if_failed_first_time)
|| e.errnum() == ER_ACCESS_DENIED_ERROR
|| e.errnum() == ER_DBACCESS_DENIED_ERROR
|| e.errnum() == ER_BAD_DB_ERROR)
{
app.logger().error(e.what());
throw;
}
else
{
app.logger().error(e.what());
return nullptr;
}
}
was_successful = true;
auto * connection = conn.release();
connections.push_back(connection);
return connection;
}
/** Create new connection. */
Connection * allocConnection(bool dont_throw_if_failed_first_time = false);
};
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <mysqlxx/Types.h>
@ -29,10 +28,7 @@ public:
MYSQL_RES * getRes() { return res; }
const Query * getQuery() const { return query; }
virtual ~ResultBase()
{
mysql_free_result(res);
}
virtual ~ResultBase();
protected:
MYSQL_RES * res;

View File

@ -51,18 +51,8 @@ public:
return Value(row[n], lengths[n], res);
}
/** Получить значение по имени. Слегка менее эффективно. */
Value operator[] (const char * name) const
{
unsigned n = res->getNumFields();
MYSQL_FIELDS fields = res->getFields();
for (unsigned i = 0; i < n; ++i)
if (!strcmp(name, fields[i].name))
return operator[](i);
throw Exception(std::string("Unknown column ") + name);
}
/** Get value by column name. Less efficient. */
Value operator[] (const char * name) const;
Value operator[] (const std::string & name) const
{

View File

@ -1,13 +1,25 @@
#pragma once
#include <string>
#include <mysql.h>
#include <Poco/Types.h>
#include <common/LocalDate.h>
#include <common/LocalDateTime.h>
struct st_mysql;
using MYSQL = st_mysql;
struct st_mysql_res;
using MYSQL_RES = st_mysql_res;
using MYSQL_ROW = char**;
struct st_mysql_field;
using MYSQL_FIELD = st_mysql_field;
namespace mysqlxx
{

View File

@ -224,154 +224,13 @@ private:
/// Прочитать беззнаковое целое в простом формате из не-0-terminated строки.
UInt64 readUIntText(const char * buf, size_t length) const
{
UInt64 x = 0;
const char * end = buf + length;
while (buf != end)
{
switch (*buf)
{
case '+':
break;
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
x *= 10;
x += *buf - '0';
break;
default:
throwException("Cannot parse unsigned integer");
}
++buf;
}
return x;
}
UInt64 readUIntText(const char * buf, size_t length) const;
/// Прочитать знаковое целое в простом формате из не-0-terminated строки.
Int64 readIntText(const char * buf, size_t length) const
{
bool negative = false;
Int64 x = 0;
const char * end = buf + length;
while (buf != end)
{
switch (*buf)
{
case '+':
break;
case '-':
negative = true;
break;
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
x *= 10;
x += *buf - '0';
break;
default:
throwException("Cannot parse signed integer");
}
++buf;
}
if (negative)
x = -x;
return x;
}
Int64 readIntText(const char * buf, size_t length) const;
/// Прочитать число с плавающей запятой в простом формате, с грубым округлением, из не-0-terminated строки.
double readFloatText(const char * buf, size_t length) const
{
bool negative = false;
double x = 0;
bool after_point = false;
double power_of_ten = 1;
const char * end = buf + length;
while (buf != end)
{
switch (*buf)
{
case '+':
break;
case '-':
negative = true;
break;
case '.':
after_point = true;
break;
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
if (after_point)
{
power_of_ten /= 10;
x += (*buf - '0') * power_of_ten;
}
else
{
x *= 10;
x += *buf - '0';
}
break;
case 'e':
case 'E':
{
++buf;
Int32 exponent = readIntText(buf, end - buf);
x *= exp10(exponent);
if (negative)
x = -x;
return x;
}
case 'i':
case 'I':
x = std::numeric_limits<double>::infinity();
if (negative)
x = -x;
return x;
case 'n':
case 'N':
x = std::numeric_limits<double>::quiet_NaN();
return x;
default:
throwException("Cannot parse floating point number");
}
++buf;
}
if (negative)
x = -x;
return x;
}
double readFloatText(const char * buf, size_t length) const;
/// Выкинуть исключение с подробной информацией
void throwException(const char * text) const;

View File

@ -1,3 +1,4 @@
#include <mysql.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/Exception.h>
@ -5,8 +6,20 @@
namespace mysqlxx
{
LibrarySingleton::LibrarySingleton()
{
if (mysql_library_init(0, nullptr, nullptr))
throw Exception("Cannot initialize MySQL library.");
}
LibrarySingleton::~LibrarySingleton()
{
mysql_library_end();
}
Connection::Connection()
: driver(std::make_unique<MYSQL>())
{
is_connected = false;
@ -22,11 +35,19 @@ Connection::Connection(
unsigned port,
unsigned timeout,
unsigned rw_timeout)
: driver(std::make_unique<MYSQL>())
{
is_connected = false;
connect(db, server, user, password, port, timeout, rw_timeout);
}
Connection::Connection(const std::string & config_name)
: driver(std::make_unique<MYSQL>())
{
is_connected = false;
connect(config_name);
}
Connection::~Connection()
{
disconnect();
@ -47,36 +68,36 @@ void Connection::connect(const char* db,
/// Инициализация библиотеки.
LibrarySingleton::instance();
if (!mysql_init(&driver))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (!mysql_init(driver.get()))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/// Установим таймауты
if (mysql_options(&driver, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout)))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (mysql_options(driver.get(), MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout)))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
if (mysql_options(&driver, MYSQL_OPT_READ_TIMEOUT, reinterpret_cast<const char *>(&rw_timeout)))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (mysql_options(driver.get(), MYSQL_OPT_READ_TIMEOUT, reinterpret_cast<const char *>(&rw_timeout)))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
if (mysql_options(&driver, MYSQL_OPT_WRITE_TIMEOUT, reinterpret_cast<const char *>(&rw_timeout)))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (mysql_options(driver.get(), MYSQL_OPT_WRITE_TIMEOUT, reinterpret_cast<const char *>(&rw_timeout)))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/** Включаем возможность использовать запрос LOAD DATA LOCAL INFILE с серверами,
* которые были скомпилированы без опции --enable-local-infile.
*/
if (mysql_options(&driver, MYSQL_OPT_LOCAL_INFILE, nullptr))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (mysql_options(driver.get(), MYSQL_OPT_LOCAL_INFILE, nullptr))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
if (!mysql_real_connect(&driver, server, user, password, db, port, nullptr, driver.client_flag))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (!mysql_real_connect(driver.get(), server, user, password, db, port, nullptr, driver->client_flag))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/// Установим кодировки по умолчанию - UTF-8.
if (mysql_set_character_set(&driver, "UTF8"))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (mysql_set_character_set(driver.get(), "UTF8"))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
/// Установим автоматический реконнект
my_bool reconnect = true;
if (mysql_options(&driver, MYSQL_OPT_RECONNECT, reinterpret_cast<const char *>(&reconnect)))
throw ConnectionFailed(errorMessage(&driver), mysql_errno(&driver));
if (mysql_options(driver.get(), MYSQL_OPT_RECONNECT, reinterpret_cast<const char *>(&reconnect)))
throw ConnectionFailed(errorMessage(driver.get()), mysql_errno(driver.get()));
is_connected = true;
}
@ -91,14 +112,14 @@ void Connection::disconnect()
if (!is_connected)
return;
mysql_close(&driver);
memset(&driver, 0, sizeof(driver));
mysql_close(driver.get());
memset(driver.get(), 0, sizeof(*driver));
is_connected = false;
}
bool Connection::ping()
{
return is_connected && !mysql_ping(&driver);
return is_connected && !mysql_ping(driver.get());
}
Query Connection::query(const std::string & str)
@ -108,7 +129,7 @@ Query Connection::query(const std::string & str)
MYSQL * Connection::getDriver()
{
return &driver;
return driver.get();
}
}

View File

@ -0,0 +1,32 @@
#include <mysql.h>
#include <mysqlxx/Exception.h>
namespace mysqlxx
{
std::string errorMessage(MYSQL * driver)
{
std::stringstream res;
res << mysql_error(driver) << " (" << driver->host << ":" << driver->port << ")";
return res.str();
}
/// Для внутренних нужд библиотеки.
void checkError(MYSQL * driver)
{
unsigned num = mysql_errno(driver);
if (num)
throw Exception(errorMessage(driver), num);
}
/// Для внутренних нужд библиотеки.
void onError(MYSQL * driver)
{
throw Exception(errorMessage(driver), mysql_errno(driver));
}
}

View File

@ -0,0 +1,147 @@
#include <mysql.h>
#include <mysqld_error.h>
#include <mysqlxx/Pool.h>
namespace mysqlxx
{
void Pool::Entry::incrementRefCount()
{
if (!data)
return;
++data->ref_count;
mysql_thread_init();
}
void Pool::Entry::decrementRefCount()
{
if (!data)
return;
--data->ref_count;
mysql_thread_end();
}
Pool::~Pool()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
for (Connections::iterator it = connections.begin(); it != connections.end(); it++)
delete static_cast<Connection *>(*it);
}
Pool::Entry Pool::Get()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
initialize();
for (;;)
{
for (Connections::iterator it = connections.begin(); it != connections.end(); it++)
{
if ((*it)->ref_count == 0)
return Entry(*it, this);
}
if (connections.size() < static_cast<size_t>(max_connections))
{
Connection * conn = allocConnection();
if (conn)
return Entry(conn, this);
}
lock.unlock();
::sleep(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
}
Pool::Entry Pool::tryGet()
{
Poco::ScopedLock<Poco::FastMutex> locker(lock);
initialize();
/// Поиск уже установленного, но не использующегося сейчас соединения.
for (Connections::iterator it = connections.begin(); it != connections.end(); ++it)
{
if ((*it)->ref_count == 0)
{
Entry res(*it, this);
return res.tryForceConnected() ? res : Entry();
}
}
/// Если пул переполнен.
if (connections.size() >= max_connections)
throw Poco::Exception("mysqlxx::Pool is full");
/// Выделение нового соединения.
Connection * conn = allocConnection(true);
if (conn)
return Entry(conn, this);
return Entry();
}
void Pool::initialize()
{
if (!initialized)
{
description = db + "@" + server + ":" + Poco::NumberFormatter::format(port) + " as user " + user;
for (unsigned i = 0; i < default_connections; i++)
allocConnection();
initialized = true;
}
}
Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
{
Poco::Util::Application & app = Poco::Util::Application::instance();
std::unique_ptr<Connection> conn(new Connection);
try
{
app.logger().information("MYSQL: Connecting to " + description);
conn->conn.connect(
db.c_str(),
server.c_str(),
user.c_str(),
password.c_str(),
port,
connect_timeout,
rw_timeout);
}
catch (mysqlxx::ConnectionFailed & e)
{
if ((!was_successful && !dont_throw_if_failed_first_time)
|| e.errnum() == ER_ACCESS_DENIED_ERROR
|| e.errnum() == ER_DBACCESS_DENIED_ERROR
|| e.errnum() == ER_BAD_DB_ERROR)
{
app.logger().error(e.what());
throw;
}
else
{
app.logger().error(e.what());
return nullptr;
}
}
was_successful = true;
auto * connection = conn.release();
connections.push_back(connection);
return connection;
}
}

View File

@ -1,3 +1,4 @@
#include <mysql.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/Query.h>

View File

@ -1,3 +1,4 @@
#include <mysql.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/ResultBase.h>
@ -11,4 +12,9 @@ ResultBase::ResultBase(MYSQL_RES * res_, Connection * conn_, const Query * query
num_fields = mysql_num_fields(res);
}
ResultBase::~ResultBase()
{
mysql_free_result(res);
}
}

View File

@ -0,0 +1,20 @@
#include <mysql.h>
#include <mysqlxx/Row.h>
namespace mysqlxx
{
Value Row::operator[] (const char * name) const
{
unsigned n = res->getNumFields();
MYSQL_FIELDS fields = res->getFields();
for (unsigned i = 0; i < n; ++i)
if (!strcmp(name, fields[i].name))
return operator[](i);
throw Exception(std::string("Unknown column ") + name);
}
}

View File

@ -1,3 +1,4 @@
#include <mysql.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/StoreQueryResult.h>

View File

@ -1,3 +1,4 @@
#include <mysql.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/UseQueryResult.h>

View File

@ -4,7 +4,157 @@
#include <mysqlxx/Exception.h>
void mysqlxx::Value::throwException(const char * text) const
namespace mysqlxx
{
UInt64 Value::readUIntText(const char * buf, size_t length) const
{
UInt64 x = 0;
const char * end = buf + length;
while (buf != end)
{
switch (*buf)
{
case '+':
break;
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
x *= 10;
x += *buf - '0';
break;
default:
throwException("Cannot parse unsigned integer");
}
++buf;
}
return x;
}
Int64 Value::readIntText(const char * buf, size_t length) const
{
bool negative = false;
Int64 x = 0;
const char * end = buf + length;
while (buf != end)
{
switch (*buf)
{
case '+':
break;
case '-':
negative = true;
break;
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
x *= 10;
x += *buf - '0';
break;
default:
throwException("Cannot parse signed integer");
}
++buf;
}
if (negative)
x = -x;
return x;
}
double Value::readFloatText(const char * buf, size_t length) const
{
bool negative = false;
double x = 0;
bool after_point = false;
double power_of_ten = 1;
const char * end = buf + length;
while (buf != end)
{
switch (*buf)
{
case '+':
break;
case '-':
negative = true;
break;
case '.':
after_point = true;
break;
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
if (after_point)
{
power_of_ten /= 10;
x += (*buf - '0') * power_of_ten;
}
else
{
x *= 10;
x += *buf - '0';
}
break;
case 'e':
case 'E':
{
++buf;
Int32 exponent = readIntText(buf, end - buf);
x *= exp10(exponent);
if (negative)
x = -x;
return x;
}
case 'i':
case 'I':
x = std::numeric_limits<double>::infinity();
if (negative)
x = -x;
return x;
case 'n':
case 'N':
x = std::numeric_limits<double>::quiet_NaN();
return x;
default:
throwException("Cannot parse floating point number");
}
++buf;
}
if (negative)
x = -x;
return x;
}
void Value::throwException(const char * text) const
{
std::stringstream info;
info << text;
@ -20,3 +170,5 @@ void mysqlxx::Value::throwException(const char * text) const
throw CannotParseValue(info.str());
}
}