From 1b4b0d5779bd08b2aa7cab37d8a876b864fed6e2 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 26 Jan 2015 19:53:44 +0300 Subject: [PATCH] dbms: MysqlDictionarySource with draft MysqlBockInputStream --- .../DB/Dictionaries/DictionaryFactory.h | 4 +- .../DB/Dictionaries/DictionarySourceFactory.h | 12 +- .../DB/Dictionaries/DictionaryStructure.h | 4 +- .../DB/Dictionaries/FileDictionarySource.h | 4 +- dbms/include/DB/Dictionaries/FlatDictionary.h | 4 +- .../DB/Dictionaries/MysqlDictionarySource.h | 112 ++++++++++++++++++ dbms/include/DB/Dictionaries/config_ptr_t.h | 15 +++ dbms/include/DB/Interpreters/Dictionaries.h | 7 -- dbms/src/Interpreters/Dictionaries.cpp | 11 +- 9 files changed, 140 insertions(+), 33 deletions(-) create mode 100644 dbms/include/DB/Dictionaries/MysqlDictionarySource.h create mode 100644 dbms/include/DB/Dictionaries/config_ptr_t.h diff --git a/dbms/include/DB/Dictionaries/DictionaryFactory.h b/dbms/include/DB/Dictionaries/DictionaryFactory.h index 1b531cdc332..6a5836ab320 100644 --- a/dbms/include/DB/Dictionaries/DictionaryFactory.h +++ b/dbms/include/DB/Dictionaries/DictionaryFactory.h @@ -13,10 +13,10 @@ namespace DB class DictionaryFactory : public Singleton { public: - DictionaryPtr create(const Poco::Util::XMLConfiguration & config, const std::string & config_prefix, + DictionaryPtr create(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const Context & context) const { - auto dict_struct = DictionaryStructure::fromXML(config, config_prefix + "structure"); + auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure"); auto source_ptr = DictionarySourceFactory::instance().create( config, config_prefix + "source.", dict_struct, context); diff --git a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h index 91073abc964..63ccadcc005 100644 --- a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h +++ b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -39,7 +40,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & class DictionarySourceFactory : public Singleton { public: - DictionarySourcePtr create(const Poco::Util::AbstractConfiguration & config, + DictionarySourcePtr create(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DictionaryStructure & dict_struct, const Context & context) const @@ -54,15 +55,10 @@ public: } else if (config.has(config_prefix + "mysql")) { - throw Exception{ - "source.mysql not yet implemented", - ErrorCodes::NOT_IMPLEMENTED - }; + return ext::make_unique(config, config_prefix + "mysql.", sample_block, context); } - throw Exception{ - "unsupported source type" - }; + throw Exception{"unsupported source type"}; } }; diff --git a/dbms/include/DB/Dictionaries/DictionaryStructure.h b/dbms/include/DB/Dictionaries/DictionaryStructure.h index 9720f5d87d8..cf7b68b644a 100644 --- a/dbms/include/DB/Dictionaries/DictionaryStructure.h +++ b/dbms/include/DB/Dictionaries/DictionaryStructure.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include @@ -22,7 +22,7 @@ struct DictionaryStructure std::string id_name; std::vector attributes; - static DictionaryStructure fromXML(const Poco::Util::XMLConfiguration & config, const std::string & config_prefix) + static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) { const auto & id_name = config.getString(config_prefix + ".id.name"); if (id_name.empty()) diff --git a/dbms/include/DB/Dictionaries/FileDictionarySource.h b/dbms/include/DB/Dictionaries/FileDictionarySource.h index 0218a663979..0502fbab4f1 100644 --- a/dbms/include/DB/Dictionaries/FileDictionarySource.h +++ b/dbms/include/DB/Dictionaries/FileDictionarySource.h @@ -7,10 +7,10 @@ namespace DB { -const auto max_block_size = 8192; - class FileDictionarySource final : public IDictionarySource { + static const auto max_block_size = 8192; + public: FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block, const Context & context) diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index 9e3a8cf8d08..221414cf904 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include @@ -17,7 +17,7 @@ const auto max_array_size = 500000; class FlatDictionary final : public IDictionary { public: - FlatDictionary(const DictionaryStructure & dict_struct, const Poco::Util::XMLConfiguration & config, + FlatDictionary(const DictionaryStructure & dict_struct, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, DictionarySourcePtr source_ptr) : source_ptr{std::move(source_ptr)} { diff --git a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h b/dbms/include/DB/Dictionaries/MysqlDictionarySource.h new file mode 100644 index 00000000000..f2448decc55 --- /dev/null +++ b/dbms/include/DB/Dictionaries/MysqlDictionarySource.h @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class MysqlBlockInputStream final : public IProfilingBlockInputStream +{ +public: + MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size) + : query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size} + { + } + + String getName() const override { return "MysqlBlockInputStream"; } + + String getID() const override + { + return "Mysql(" + query.str() + ")"; + } + +private: + Block readImpl() override + { + auto block = sample_block.cloneEmpty(); + + std::size_t rows = 0; + while (auto row = result.fetch()) + { + for (const auto idx : ext::range(0, row.size())) + /// @todo type switch to get the real value from row[idx] + block.getByPosition(idx).column->insert(Field{}); + + ++rows; + if (rows == max_block_size) + break; + } + + return block; + } + + mysqlxx::Query query; + mysqlxx::UseQueryResult result; + Block sample_block; + std::size_t max_block_size; +}; + +class MysqlDictionarySource final : public IDictionarySource +{ + static const auto max_block_size = 8192; + +public: + MysqlDictionarySource(Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + Block & sample_block, const Context & context) + : layered_config_ptr{getLayeredConfig(config)}, + pool{*layered_config_ptr, config_prefix}, + sample_block{sample_block}, context(context) {} + +private: + BlockInputStreamPtr loadAll() override + { + auto connection = pool.Get(); + auto query = connection->query("SELECT 1+1;"); + auto result = query.use(); + while (auto row = result.fetch()) + { + for (const auto idx : ext::range(0, row.size())) + std::cout << row[idx].getString() << ' '; + std::cout << std::endl; + } + return new MysqlBlockInputStream{pool.Get()->query(""), sample_block, max_block_size}; + } + + BlockInputStreamPtr loadId(const std::uint64_t id) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + BlockInputStreamPtr loadIds(const std::vector ids) override + { + throw Exception{ + "Method unsupported", + ErrorCodes::NOT_IMPLEMENTED + }; + } + + static config_ptr_t getLayeredConfig(Poco::Util::AbstractConfiguration & config) + { + config_ptr_t layered_config{new Poco::Util::LayeredConfiguration}; + layered_config->add(&config); + return layered_config; + } + + const config_ptr_t layered_config_ptr; + mysqlxx::Pool pool; + Block sample_block; + const Context & context; +}; + +} diff --git a/dbms/include/DB/Dictionaries/config_ptr_t.h b/dbms/include/DB/Dictionaries/config_ptr_t.h new file mode 100644 index 00000000000..f5f29792cc7 --- /dev/null +++ b/dbms/include/DB/Dictionaries/config_ptr_t.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace DB +{ + +template struct release +{ + void operator()(const T * const ptr) { ptr->release(); } +}; + +template using config_ptr_t = std::unique_ptr>; + +} diff --git a/dbms/include/DB/Interpreters/Dictionaries.h b/dbms/include/DB/Interpreters/Dictionaries.h index 41d59eb2f1d..7ad53a4844e 100644 --- a/dbms/include/DB/Interpreters/Dictionaries.h +++ b/dbms/include/DB/Interpreters/Dictionaries.h @@ -200,13 +200,6 @@ public: MultiVersion::Version getExternalDictionary(const std::string & name) const { - std::cout << "there are dictionaries: "; - std::transform(std::begin(external_dictionaries), std::end(external_dictionaries), - std::ostream_iterator{std::cout, ", "}, - [] (const std::pair>> & pair) { - return pair.first; - }); - std::cout << std::endl; const auto it = external_dictionaries.find(name); if (it == std::end(external_dictionaries)) throw Exception{ diff --git a/dbms/src/Interpreters/Dictionaries.cpp b/dbms/src/Interpreters/Dictionaries.cpp index f99f487b62c..93b46c6ffe5 100644 --- a/dbms/src/Interpreters/Dictionaries.cpp +++ b/dbms/src/Interpreters/Dictionaries.cpp @@ -1,20 +1,11 @@ #include #include -#include +#include namespace DB { -namespace -{ - template struct release - { - void operator()(const T * const ptr) { ptr->release(); } - }; - template using config_ptr_t = std::unique_ptr>; -}; - void Dictionaries::reloadExternals() { const std::lock_guard lock{externals_mutex};