From 63eaa97086b169e2ea9dd43b2b109699533665b9 Mon Sep 17 00:00:00 2001 From: proller Date: Fri, 18 Nov 2016 04:48:13 +0300 Subject: [PATCH] wip --- .../DB/Dictionaries/DictionarySourceFactory.h | 4 +- .../Dictionaries/ExecutableDictionarySource.h | 7 +- .../DB/Dictionaries/HTTPDictionarySource.h | 5 +- .../ExecutableDictionarySource.cpp | 96 +++++++++++++++---- .../src/Dictionaries/HTTPDictionarySource.cpp | 19 ++-- 5 files changed, 91 insertions(+), 40 deletions(-) diff --git a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h index ad3d3038828..52caf2d085e 100644 --- a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h +++ b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h @@ -139,7 +139,7 @@ public: const auto name = config.getString(config_prefix + ".executable.name"); const auto format = config.getString(config_prefix + ".executable.format"); - return std::make_unique(name, format, sample_block, context); + return std::make_unique(dict_struct, name, format, sample_block, context); } else if ("http" == source_type) @@ -149,7 +149,7 @@ public: "Dictionary source of type `http` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR}; - return std::make_unique(config, config_prefix + ".http", sample_block, context); + return std::make_unique(dict_struct, config, config_prefix + ".http", sample_block, context); } throw Exception{ diff --git a/dbms/include/DB/Dictionaries/ExecutableDictionarySource.h b/dbms/include/DB/Dictionaries/ExecutableDictionarySource.h index 1ef36f2fb01..01865c79b44 100644 --- a/dbms/include/DB/Dictionaries/ExecutableDictionarySource.h +++ b/dbms/include/DB/Dictionaries/ExecutableDictionarySource.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { @@ -13,7 +14,7 @@ class ExecutableDictionarySource final : public IDictionarySource public: - ExecutableDictionarySource(const std::string & name, const std::string & format, Block & sample_block, + ExecutableDictionarySource(const DictionaryStructure & dict_struct_, const std::string & name, const std::string & format, Block & sample_block, const Context & context); ExecutableDictionarySource(const ExecutableDictionarySource & other); @@ -36,13 +37,11 @@ public: private: Logger * log = &Logger::get("ExecutableDictionarySource"); - LocalDateTime getLastModification() const; - + const DictionaryStructure dict_struct; const std::string name; const std::string format; Block sample_block; const Context & context; - LocalDateTime last_modification; }; } diff --git a/dbms/include/DB/Dictionaries/HTTPDictionarySource.h b/dbms/include/DB/Dictionaries/HTTPDictionarySource.h index 5f49ad7942f..1ba5e2271ad 100644 --- a/dbms/include/DB/Dictionaries/HTTPDictionarySource.h +++ b/dbms/include/DB/Dictionaries/HTTPDictionarySource.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { @@ -13,7 +14,7 @@ class HTTPDictionarySource final : public IDictionarySource public: - HTTPDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, + HTTPDictionarySource(const DictionaryStructure & dict_struct_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & context); HTTPDictionarySource(const HTTPDictionarySource & other); @@ -41,6 +42,7 @@ private: LocalDateTime getLastModification() const; + const DictionaryStructure dict_struct; const std::string host; int port; const std::string path; @@ -50,7 +52,6 @@ private: Block sample_block; const Context & context; const std::string load_all_query; - LocalDateTime last_modification; }; } diff --git a/dbms/src/Dictionaries/ExecutableDictionarySource.cpp b/dbms/src/Dictionaries/ExecutableDictionarySource.cpp index 4f90c810bb9..60ea8cb0560 100644 --- a/dbms/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/dbms/src/Dictionaries/ExecutableDictionarySource.cpp @@ -4,26 +4,34 @@ #include #include +//#include +#include +#include + namespace DB { -ExecutableDictionarySource::ExecutableDictionarySource(const std::string & name, const std::string & format, Block & sample_block, const Context & context) - : name{name}, format{format}, sample_block{sample_block}, context(context) +ExecutableDictionarySource::ExecutableDictionarySource(const DictionaryStructure & dict_struct_, const std::string & name, const std::string & format, Block & sample_block, const Context & context) : + dict_struct{dict_struct_}, + name{name}, + format{format}, + sample_block{sample_block}, + context(context) { - last_modification = std::time(nullptr); } -ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other) - : name{other.name}, +ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other) : + dict_struct{other.dict_struct}, + name{other.name}, format{other.format}, - sample_block{other.sample_block}, context(other.context), - last_modification{other.last_modification} + sample_block{other.sample_block}, + context(other.context) { } BlockInputStreamPtr ExecutableDictionarySource::loadAll() { - last_modification = getLastModification(); + //std::cerr << "ExecutableDictionarySource::loadAll " <out, sample_block, max_block_size); @@ -32,20 +40,39 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll() BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector & ids) { + //std::cerr << "ExecutableDictionarySource::loadIds s=" << ids.size() <(); + column.column = column.type->createColumn(); + + for (auto & id : ids) { + column.column->insert(id); //maybe faster? + } + + Block block; + block.insert(std::move(column)); + + auto stream_out = context.getOutputFormat(format, process->in, sample_block); + stream_out->write(block); + } + +/* for (auto & id : ids) { writeString(std::to_string(id), process->in); - writeString("\n", process->in); // TODO: format? + writeString("\n", process->in); } +*/ process->in.close(); - /* +/* std::string process_err; readStringUntilEOF(process_err, process->err); std::cerr << "readed ERR [" << process_err << "] " << std::endl; - */ +*/ auto stream = context.getInputFormat( format, process->out, sample_block, max_block_size); return std::make_shared>(stream, std::move(process)); @@ -54,13 +81,48 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector & requested_rows) { -std::cerr << "ExecutableDictionarySource::loadKeys " << requested_rows.size() << std::endl; - throw Exception{"Method unsupported", ErrorCodes::NOT_IMPLEMENTED}; + //std::cerr << " ExecutableDictionarySource::loadKeys cols=" << key_columns.size() << " rows=" <(); // TODO TYPE + //column.column = column.type->createColumn(); + //column.column.reset(const_cast(key)); + column.column = key->clone(); // wrong! + //column.column = key->convertToFullColumnIfConst(); // check! + block.insert(std::move(column)); + } + + auto stream_out = context.getOutputFormat(format, process->in, sample_block); + stream_out->write(block); + + } +/* + for (const auto row : requested_rows) + { + writeString(std::to_string(row), process->in); + writeString("\n", process->in); // TODO: format? + } +*/ + + process->in.close(); + +/* + std::string process_err; + readStringUntilEOF(process_err, process->err); + std::cerr << "readed ERR [" << process_err << "] " << std::endl; +*/ + + auto stream = context.getInputFormat( format, process->out, sample_block, max_block_size); + return std::make_shared>(stream, std::move(process)); } bool ExecutableDictionarySource::isModified() const { - return getLastModification() > last_modification; + return true; } bool ExecutableDictionarySource::supportsSelectiveLoad() const @@ -78,10 +140,4 @@ std::string ExecutableDictionarySource::toString() const return "Executable: " + name; } -LocalDateTime ExecutableDictionarySource::getLastModification() const -{ - return last_modification; -} - - } diff --git a/dbms/src/Dictionaries/HTTPDictionarySource.cpp b/dbms/src/Dictionaries/HTTPDictionarySource.cpp index 16f23260e54..0a28c2bdf0c 100644 --- a/dbms/src/Dictionaries/HTTPDictionarySource.cpp +++ b/dbms/src/Dictionaries/HTTPDictionarySource.cpp @@ -8,7 +8,8 @@ namespace DB { -HTTPDictionarySource::HTTPDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & context) : +HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_struct_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, const Context & context) : + dict_struct{dict_struct_}, host{config.getString(config_prefix + ".host")}, port{std::stoi(config.getString(config_prefix + ".port"))}, path{config.getString(config_prefix + ".path")}, @@ -17,24 +18,23 @@ HTTPDictionarySource::HTTPDictionarySource(const Poco::Util::AbstractConfigurati sample_block{sample_block}, context(context) { - last_modification = std::time(nullptr); - } HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) : + dict_struct{other.dict_struct}, host{other.host}, port{other.port}, path{other.path}, format{other.format}, - sample_block{other.sample_block}, context(other.context), - last_modification{other.last_modification} + sample_block{other.sample_block}, + context(other.context) { } BlockInputStreamPtr HTTPDictionarySource::loadAll() { auto in_ptr = std::make_unique(host, port, path, ReadBufferFromHTTP::Params(), method); - auto stream = context.getInputFormat( format, *in_ptr, sample_block, max_block_size); + auto stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared(stream, std::move(in_ptr)); } @@ -51,7 +51,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys( bool HTTPDictionarySource::isModified() const { - return getLastModification() > last_modification; + return true; } bool HTTPDictionarySource::supportsSelectiveLoad() const @@ -69,9 +69,4 @@ std::string HTTPDictionarySource::toString() const return "http://" + host + ":" + std::to_string(port) + "/" + path; } -LocalDateTime HTTPDictionarySource::getLastModification() const -{ - return last_modification; -} - }