mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
dbms: MysqlDictionarySource with draft MysqlBockInputStream
This commit is contained in:
parent
419d517867
commit
1b4b0d5779
@ -13,10 +13,10 @@ namespace DB
|
|||||||
class DictionaryFactory : public Singleton<DictionaryFactory>
|
class DictionaryFactory : public Singleton<DictionaryFactory>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DictionaryPtr create(const Poco::Util::XMLConfiguration & config, const std::string & config_prefix,
|
DictionaryPtr create(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
|
||||||
const Context & context) const
|
const Context & context) const
|
||||||
{
|
{
|
||||||
auto dict_struct = DictionaryStructure::fromXML(config, config_prefix + "structure");
|
auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure");
|
||||||
|
|
||||||
auto source_ptr = DictionarySourceFactory::instance().create(
|
auto source_ptr = DictionarySourceFactory::instance().create(
|
||||||
config, config_prefix + "source.", dict_struct, context);
|
config, config_prefix + "source.", dict_struct, context);
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <DB/Core/Block.h>
|
#include <DB/Core/Block.h>
|
||||||
#include <DB/Dictionaries/DictionaryStructure.h>
|
#include <DB/Dictionaries/DictionaryStructure.h>
|
||||||
#include <DB/Dictionaries/FileDictionarySource.h>
|
#include <DB/Dictionaries/FileDictionarySource.h>
|
||||||
|
#include <DB/Dictionaries/MysqlDictionarySource.h>
|
||||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||||
#include <Yandex/singleton.h>
|
#include <Yandex/singleton.h>
|
||||||
#include <statdaemons/ext/memory.hpp>
|
#include <statdaemons/ext/memory.hpp>
|
||||||
@ -39,7 +40,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct, const Context &
|
|||||||
class DictionarySourceFactory : public Singleton<DictionarySourceFactory>
|
class DictionarySourceFactory : public Singleton<DictionarySourceFactory>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DictionarySourcePtr create(const Poco::Util::AbstractConfiguration & config,
|
DictionarySourcePtr create(Poco::Util::AbstractConfiguration & config,
|
||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
const DictionaryStructure & dict_struct,
|
const DictionaryStructure & dict_struct,
|
||||||
const Context & context) const
|
const Context & context) const
|
||||||
@ -54,15 +55,10 @@ public:
|
|||||||
}
|
}
|
||||||
else if (config.has(config_prefix + "mysql"))
|
else if (config.has(config_prefix + "mysql"))
|
||||||
{
|
{
|
||||||
throw Exception{
|
return ext::make_unique<MysqlDictionarySource>(config, config_prefix + "mysql.", sample_block, context);
|
||||||
"source.mysql not yet implemented",
|
|
||||||
ErrorCodes::NOT_IMPLEMENTED
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw Exception{
|
throw Exception{"unsupported source type"};
|
||||||
"unsupported source type"
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <DB/Core/ErrorCodes.h>
|
#include <DB/Core/ErrorCodes.h>
|
||||||
#include <Poco/Util/XMLConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
@ -22,7 +22,7 @@ struct DictionaryStructure
|
|||||||
std::string id_name;
|
std::string id_name;
|
||||||
std::vector<DictionaryAttribute> attributes;
|
std::vector<DictionaryAttribute> attributes;
|
||||||
|
|
||||||
static DictionaryStructure fromXML(const Poco::Util::XMLConfiguration & config, const std::string & config_prefix)
|
static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
|
||||||
{
|
{
|
||||||
const auto & id_name = config.getString(config_prefix + ".id.name");
|
const auto & id_name = config.getString(config_prefix + ".id.name");
|
||||||
if (id_name.empty())
|
if (id_name.empty())
|
||||||
|
@ -7,10 +7,10 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
const auto max_block_size = 8192;
|
|
||||||
|
|
||||||
class FileDictionarySource final : public IDictionarySource
|
class FileDictionarySource final : public IDictionarySource
|
||||||
{
|
{
|
||||||
|
static const auto max_block_size = 8192;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
|
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
|
||||||
const Context & context)
|
const Context & context)
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
#include <DB/Dictionaries/IDictionarySource.h>
|
#include <DB/Dictionaries/IDictionarySource.h>
|
||||||
#include <DB/Dictionaries/IDictionary.h>
|
#include <DB/Dictionaries/IDictionary.h>
|
||||||
#include <statdaemons/ext/range.hpp>
|
#include <statdaemons/ext/range.hpp>
|
||||||
#include <Poco/Util/XMLConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -17,7 +17,7 @@ const auto max_array_size = 500000;
|
|||||||
class FlatDictionary final : public IDictionary
|
class FlatDictionary final : public IDictionary
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
FlatDictionary(const DictionaryStructure & dict_struct, const Poco::Util::XMLConfiguration & config,
|
FlatDictionary(const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config,
|
||||||
const std::string & config_prefix, DictionarySourcePtr source_ptr)
|
const std::string & config_prefix, DictionarySourcePtr source_ptr)
|
||||||
: source_ptr{std::move(source_ptr)}
|
: source_ptr{std::move(source_ptr)}
|
||||||
{
|
{
|
||||||
|
112
dbms/include/DB/Dictionaries/MysqlDictionarySource.h
Normal file
112
dbms/include/DB/Dictionaries/MysqlDictionarySource.h
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <DB/Interpreters/Context.h>
|
||||||
|
#include <DB/Dictionaries/DictionaryStructure.h>
|
||||||
|
#include <DB/Dictionaries/IDictionarySource.h>
|
||||||
|
#include <DB/Dictionaries/config_ptr_t.h>
|
||||||
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||||
|
#include <statdaemons/ext/range.hpp>
|
||||||
|
#include <mysqlxx/Pool.h>
|
||||||
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
|
#include <Poco/Util/LayeredConfiguration.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class MysqlBlockInputStream final : public IProfilingBlockInputStream
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
|
||||||
|
: query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size}
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
String getName() const override { return "MysqlBlockInputStream"; }
|
||||||
|
|
||||||
|
String getID() const override
|
||||||
|
{
|
||||||
|
return "Mysql(" + query.str() + ")";
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Block readImpl() override
|
||||||
|
{
|
||||||
|
auto block = sample_block.cloneEmpty();
|
||||||
|
|
||||||
|
std::size_t rows = 0;
|
||||||
|
while (auto row = result.fetch())
|
||||||
|
{
|
||||||
|
for (const auto idx : ext::range(0, row.size()))
|
||||||
|
/// @todo type switch to get the real value from row[idx]
|
||||||
|
block.getByPosition(idx).column->insert(Field{});
|
||||||
|
|
||||||
|
++rows;
|
||||||
|
if (rows == max_block_size)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return block;
|
||||||
|
}
|
||||||
|
|
||||||
|
mysqlxx::Query query;
|
||||||
|
mysqlxx::UseQueryResult result;
|
||||||
|
Block sample_block;
|
||||||
|
std::size_t max_block_size;
|
||||||
|
};
|
||||||
|
|
||||||
|
class MysqlDictionarySource final : public IDictionarySource
|
||||||
|
{
|
||||||
|
static const auto max_block_size = 8192;
|
||||||
|
|
||||||
|
public:
|
||||||
|
MysqlDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
|
||||||
|
Block & sample_block, const Context & context)
|
||||||
|
: layered_config_ptr{getLayeredConfig(config)},
|
||||||
|
pool{*layered_config_ptr, config_prefix},
|
||||||
|
sample_block{sample_block}, context(context) {}
|
||||||
|
|
||||||
|
private:
|
||||||
|
BlockInputStreamPtr loadAll() override
|
||||||
|
{
|
||||||
|
auto connection = pool.Get();
|
||||||
|
auto query = connection->query("SELECT 1+1;");
|
||||||
|
auto result = query.use();
|
||||||
|
while (auto row = result.fetch())
|
||||||
|
{
|
||||||
|
for (const auto idx : ext::range(0, row.size()))
|
||||||
|
std::cout << row[idx].getString() << ' ';
|
||||||
|
std::cout << std::endl;
|
||||||
|
}
|
||||||
|
return new MysqlBlockInputStream{pool.Get()->query(""), sample_block, max_block_size};
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockInputStreamPtr loadId(const std::uint64_t id) override
|
||||||
|
{
|
||||||
|
throw Exception{
|
||||||
|
"Method unsupported",
|
||||||
|
ErrorCodes::NOT_IMPLEMENTED
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
|
||||||
|
{
|
||||||
|
throw Exception{
|
||||||
|
"Method unsupported",
|
||||||
|
ErrorCodes::NOT_IMPLEMENTED
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
static config_ptr_t<Poco::Util::LayeredConfiguration> getLayeredConfig(Poco::Util::AbstractConfiguration & config)
|
||||||
|
{
|
||||||
|
config_ptr_t<Poco::Util::LayeredConfiguration> layered_config{new Poco::Util::LayeredConfiguration};
|
||||||
|
layered_config->add(&config);
|
||||||
|
return layered_config;
|
||||||
|
}
|
||||||
|
|
||||||
|
const config_ptr_t<Poco::Util::LayeredConfiguration> layered_config_ptr;
|
||||||
|
mysqlxx::Pool pool;
|
||||||
|
Block sample_block;
|
||||||
|
const Context & context;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
15
dbms/include/DB/Dictionaries/config_ptr_t.h
Normal file
15
dbms/include/DB/Dictionaries/config_ptr_t.h
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
template <typename T> struct release
|
||||||
|
{
|
||||||
|
void operator()(const T * const ptr) { ptr->release(); }
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename T> using config_ptr_t = std::unique_ptr<T, release<T>>;
|
||||||
|
|
||||||
|
}
|
@ -200,13 +200,6 @@ public:
|
|||||||
|
|
||||||
MultiVersion<IDictionary>::Version getExternalDictionary(const std::string & name) const
|
MultiVersion<IDictionary>::Version getExternalDictionary(const std::string & name) const
|
||||||
{
|
{
|
||||||
std::cout << "there are dictionaries: ";
|
|
||||||
std::transform(std::begin(external_dictionaries), std::end(external_dictionaries),
|
|
||||||
std::ostream_iterator<std::string>{std::cout, ", "},
|
|
||||||
[] (const std::pair<const std::string, std::shared_ptr<MultiVersion<IDictionary>>> & pair) {
|
|
||||||
return pair.first;
|
|
||||||
});
|
|
||||||
std::cout << std::endl;
|
|
||||||
const auto it = external_dictionaries.find(name);
|
const auto it = external_dictionaries.find(name);
|
||||||
if (it == std::end(external_dictionaries))
|
if (it == std::end(external_dictionaries))
|
||||||
throw Exception{
|
throw Exception{
|
||||||
|
@ -1,20 +1,11 @@
|
|||||||
#include <DB/Interpreters/Dictionaries.h>
|
#include <DB/Interpreters/Dictionaries.h>
|
||||||
#include <DB/Dictionaries/DictionaryFactory.h>
|
#include <DB/Dictionaries/DictionaryFactory.h>
|
||||||
#include <Poco/Util/XMLConfiguration.h>
|
#include <DB/Dictionaries/config_ptr_t.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
template <typename T> struct release
|
|
||||||
{
|
|
||||||
void operator()(const T * const ptr) { ptr->release(); }
|
|
||||||
};
|
|
||||||
template <typename T> using config_ptr_t = std::unique_ptr<T, release<T>>;
|
|
||||||
};
|
|
||||||
|
|
||||||
void Dictionaries::reloadExternals()
|
void Dictionaries::reloadExternals()
|
||||||
{
|
{
|
||||||
const std::lock_guard<std::mutex> lock{externals_mutex};
|
const std::lock_guard<std::mutex> lock{externals_mutex};
|
||||||
|
Loading…
Reference in New Issue
Block a user