diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index cd2855739e2..56c045bc8df 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -409,6 +409,15 @@ Block Block::cloneWithoutColumns() const return res; } +Block Block::cloneWithCutColumns(size_t start, size_t length) const +{ + Block copy = *this; + + for (size_t i = 0; i < copy.data.size(); ++i) + copy.data[i].column = copy.data[i].column->cut(start, length); + + return copy; +} Block Block::sortColumns() const { diff --git a/src/Core/Block.h b/src/Core/Block.h index f588373aaed..14f4f57caed 100644 --- a/src/Core/Block.h +++ b/src/Core/Block.h @@ -129,6 +129,7 @@ public: void setColumns(const Columns & columns); Block cloneWithColumns(const Columns & columns) const; Block cloneWithoutColumns() const; + Block cloneWithCutColumns(size_t start, size_t length) const; /** Get empty columns with the same types as in block. */ MutableColumns cloneEmptyColumns() const; diff --git a/src/Dictionaries/DictionarySourceHelpers.cpp b/src/Dictionaries/DictionarySourceHelpers.cpp index 309bc64e179..3227704ff4b 100644 --- a/src/Dictionaries/DictionarySourceHelpers.cpp +++ b/src/Dictionaries/DictionarySourceHelpers.cpp @@ -1,6 +1,5 @@ #include "DictionarySourceHelpers.h" #include -#include #include #include #include @@ -13,44 +12,54 @@ namespace DB { -/// For simple key -void formatIDs(BlockOutputStreamPtr & out, const std::vector & ids) + +void formatWithBlock(BlockOutputStreamPtr & out, Block block) { - auto column = ColumnUInt64::create(ids.size()); - memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front())); - - Block block{{std::move(column), std::make_shared(), "id"}}; - out->writePrefix(); out->write(block); out->writeSuffix(); out->flush(); } +/// For simple key + +Block blockForIds(const std::vector & ids) +{ + auto column = ColumnUInt64::create(ids.size()); + memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front())); + + Block block{{std::move(column), std::make_shared(), "id"}}; + + std::cerr << "Block for IDs size " << ids.size() << std::endl; + + return block; +} + /// For composite key -void formatKeys( + +Block blockForKeys( const DictionaryStructure & dict_struct, - BlockOutputStreamPtr & out, const Columns & key_columns, const std::vector & requested_rows) { Block block; + for (size_t i = 0, size = key_columns.size(); i < size; ++i) { const ColumnPtr & source_column = key_columns[i]; - auto filtered_column = source_column->cloneEmpty(); - filtered_column->reserve(requested_rows.size()); + size_t column_rows_size = source_column->size(); + + PaddedPODArray filter(column_rows_size, false); for (size_t idx : requested_rows) - filtered_column->insertFrom(*source_column, idx); + filter[idx] = true; - block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, toString(i)}); + auto filtered_column = source_column->filter(filter, requested_rows.size()); + + block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, (*dict_struct.key)[i].name}); } - out->writePrefix(); - out->write(block); - out->writeSuffix(); - out->flush(); + return block; } Context copyContextAndApplySettings( diff --git a/src/Dictionaries/DictionarySourceHelpers.h b/src/Dictionaries/DictionarySourceHelpers.h index 3f42700d336..d6b0dab8d4f 100644 --- a/src/Dictionaries/DictionarySourceHelpers.h +++ b/src/Dictionaries/DictionarySourceHelpers.h @@ -1,11 +1,15 @@ #pragma once #include -#include + #include + #include #include +#include +#include + namespace DB { class IBlockOutputStream; @@ -16,13 +20,16 @@ class Context; /// Write keys to block output stream. +void formatWithBlock(BlockOutputStreamPtr & out, Block block); + /// For simple key -void formatIDs(BlockOutputStreamPtr & out, const std::vector & ids); + +Block blockForIds(const std::vector & ids); /// For composite key -void formatKeys( + +Block blockForKeys( const DictionaryStructure & dict_struct, - BlockOutputStreamPtr & out, const Columns & key_columns, const std::vector & requested_rows); @@ -36,4 +43,5 @@ void applySettingsToContext( const std::string & config_prefix, Context & context, const Poco::Util::AbstractConfiguration & config); + } diff --git a/src/Dictionaries/DictionaryStructure.cpp b/src/Dictionaries/DictionaryStructure.cpp index 95c2e0a3e09..42a7cb6e4ec 100644 --- a/src/Dictionaries/DictionaryStructure.cpp +++ b/src/Dictionaries/DictionaryStructure.cpp @@ -281,6 +281,21 @@ size_t DictionaryStructure::getKeySize() const }); } +Strings DictionaryStructure::getKeysNames() const +{ + if (id) + return { id->name }; + + auto & key_attributes = *key; + + Strings keys_names; + keys_names.reserve(key_attributes.size()); + + for (const auto & key_attribute : key_attributes) + keys_names.emplace_back(key_attribute.name); + + return keys_names; +} static void checkAttributeKeys(const Poco::Util::AbstractConfiguration::Keys & keys) { diff --git a/src/Dictionaries/DictionaryStructure.h b/src/Dictionaries/DictionaryStructure.h index 945e1c55494..c6c80498a4b 100644 --- a/src/Dictionaries/DictionaryStructure.h +++ b/src/Dictionaries/DictionaryStructure.h @@ -158,6 +158,8 @@ struct DictionaryStructure final std::string getKeyDescription() const; bool isKeySizeFixed() const; size_t getKeySize() const; + Strings getKeysNames() const; + private: /// range_min and range_max have to be parsed before this function call std::vector getAttributes( diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index f2abe10f970..d9aa89ec14e 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -26,6 +26,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int DICTIONARY_ACCESS_DENIED; + extern const int UNSUPPORTED_METHOD; } namespace @@ -65,18 +66,32 @@ ExecutableDictionarySource::ExecutableDictionarySource( const Context & context_) : log(&Poco::Logger::get("ExecutableDictionarySource")) , dict_struct{dict_struct_} + , implicit_key{config.getBool(config_prefix + ".implicit_key", false)} , command{config.getString(config_prefix + ".command")} , update_field{config.getString(config_prefix + ".update_field", "")} , format{config.getString(config_prefix + ".format")} , sample_block{sample_block_} , context(context_) { + /// Remove keys from sample_block for implicit_key dictionary because + /// this columns will not be provided by client + if (implicit_key) + { + auto keys_names = dict_struct.getKeysNames(); + + for (auto & key_name : keys_names) + { + size_t key_column_position_in_block = sample_block.getPositionByName(key_name); + sample_block.erase(key_column_position_in_block); + } + } } ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other) : log(&Poco::Logger::get("ExecutableDictionarySource")) , update_time{other.update_time} , dict_struct{other.dict_struct} + , implicit_key{other.implicit_key} , command{other.command} , update_field{other.update_field} , format{other.format} @@ -87,6 +102,9 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar BlockInputStreamPtr ExecutableDictionarySource::loadAll() { + if (implicit_key) + throw Exception("ExecutableDictionarySource with implicit_key does not support loadAll method", ErrorCodes::UNSUPPORTED_METHOD); + LOG_TRACE(log, "loadAll {}", toString()); auto process = ShellCommand::execute(command); auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size); @@ -95,6 +113,9 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll() BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() { + if (implicit_key) + throw Exception("ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method", ErrorCodes::UNSUPPORTED_METHOD); + time_t new_update_time = time(nullptr); SCOPE_EXIT(update_time = new_update_time); @@ -173,6 +194,72 @@ namespace std::function send_data; ThreadFromGlobalPool thread; }; + + /** A stream, adds additional columns to each block that it will read from inner stream. + * + * block_to_add rows size must be equal to final summ rows size of all inner stream readed blocks. + */ + class BlockInputStreamWithAdditionalColumns final: public IBlockInputStream + { + public: + BlockInputStreamWithAdditionalColumns( + Block block_to_add_, + std::unique_ptr&& stream_) + : block_to_add(std::move(block_to_add_)) + , stream(std::move(stream_)) + { + } + + Block getHeader() const override + { + auto header = stream->getHeader(); + + if (header) + { + for (int64_t i = static_cast(block_to_add.columns() - 1); i >= 0; --i) + header.insert(0, block_to_add.getByPosition(i).cloneEmpty()); + } + + return header; + } + + Block readImpl() override + { + auto block = stream->read(); + + if (block) + { + auto block_rows = block.rows(); + + auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows); + + for (int64_t i = static_cast(cut_block.columns() - 1); i >= 0; --i) + block.insert(0, cut_block.getByPosition(i)); + + current_range_index += block_rows; + } + + return block; + } + + void readPrefix() override + { + stream->readPrefix(); + } + + void readSuffix() override + { + stream->readSuffix(); + } + + String getName() const override { return "BlockInputStreamWithAdditionalColumns"; } + + private: + Block block_to_add; + std::unique_ptr stream; + size_t current_range_index = 0; + }; + } @@ -180,28 +267,44 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector( + auto block = blockForIds(ids); + + auto stream = std::make_unique( context, format, sample_block, command, log, - [&ids, this](WriteBufferFromFile & out) mutable + [block, this](WriteBufferFromFile & out) mutable { - auto output_stream = context.getOutputStream(format, out, sample_block); - formatIDs(output_stream, ids); + auto output_stream = context.getOutputStream(format, out, block.cloneEmpty()); + formatWithBlock(output_stream, block); out.close(); }); + + if (implicit_key) + { + return std::make_shared(block, std::move(stream)); + } + else + return std::shared_ptr(stream.release()); } BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows) { LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); - return std::make_shared( + auto block = blockForKeys(dict_struct, key_columns, requested_rows); + + auto stream = std::make_unique( context, format, sample_block, command, log, - [key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable + [block, this](WriteBufferFromFile & out) mutable { - auto output_stream = context.getOutputStream(format, out, sample_block); - formatKeys(dict_struct, output_stream, key_columns, requested_rows); + auto output_stream = context.getOutputStream(format, out, block.cloneEmpty()); + formatWithBlock(output_stream, block); out.close(); }); + + if (implicit_key) + return std::make_shared(block, std::move(stream)); + else + return std::shared_ptr(stream.release()); } bool ExecutableDictionarySource::isModified() const diff --git a/src/Dictionaries/ExecutableDictionarySource.h b/src/Dictionaries/ExecutableDictionarySource.h index f28d71ca5e3..7aa203f267b 100644 --- a/src/Dictionaries/ExecutableDictionarySource.h +++ b/src/Dictionaries/ExecutableDictionarySource.h @@ -49,9 +49,9 @@ public: private: Poco::Logger * log; - time_t update_time = 0; const DictionaryStructure dict_struct; + bool implicit_key; const std::string command; const std::string update_field; const std::string format; diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index da5623bcdb0..55585b836d9 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -130,12 +130,14 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & ids) { LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size()); + + auto block = blockForIds(ids); - ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) + ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr) { WriteBufferFromOStream out_buffer(ostr); auto output_stream = context.getOutputStream(format, out_buffer, sample_block); - formatIDs(output_stream, ids); + formatWithBlock(output_stream, block); }; Poco::URI uri(url); @@ -150,11 +152,13 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns, { LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size()); - ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) + auto block = blockForKeys(dict_struct, key_columns, requested_rows); + + ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr) { WriteBufferFromOStream out_buffer(ostr); auto output_stream = context.getOutputStream(format, out_buffer, sample_block); - formatKeys(dict_struct, output_stream, key_columns, requested_rows); + formatWithBlock(output_stream, block); }; Poco::URI uri(url); diff --git a/tests/config/executable_dictionary.xml b/tests/config/executable_dictionary.xml index 50df32e2ec6..c5a4a0947bc 100644 --- a/tests/config/executable_dictionary.xml +++ b/tests/config/executable_dictionary.xml @@ -105,4 +105,152 @@ + + simple_executable_cache_dictionary_no_implicit_key + + + + id + UInt64 + + + + value + String + + + + + + + echo "1\tValue" + TabSeparated + false + + + + + + 10000 + + + + 300 + + + + simple_executable_cache_dictionary_implicit_key + + + + id + UInt64 + + + + value + String + + + + + + + echo "Value" + TabSeparated + true + + + + + + 10000 + + + + 300 + + + + complex_executable_cache_dictionary_no_implicit_key + + + + + id + UInt64 + + + + id_key + String + + + + + + value + String + + + + + + + echo "1\tFirstKey\tValue" + TabSeparated + false + + + + + + 10000 + + + + 300 + + + + complex_executable_cache_dictionary_implicit_key + + + + + id + UInt64 + + + + id_key + String + + + + + + value + String + + + + + + + echo "Value" + TabSeparated + true + + + + + + 10000 + + + + 300 + + diff --git a/tests/queries/0_stateless/01474_executable_dictionary.reference b/tests/queries/0_stateless/01474_executable_dictionary.reference index 4d0994b08c3..66894d4f41f 100644 --- a/tests/queries/0_stateless/01474_executable_dictionary.reference +++ b/tests/queries/0_stateless/01474_executable_dictionary.reference @@ -1,3 +1,8 @@ 999999 1999998 999998000001 999999 1999998 999998000001 999999 1999998 999998000001 +Check implicit_key option +Value +Value +Value +Value \ No newline at end of file diff --git a/tests/queries/0_stateless/01474_executable_dictionary.sql b/tests/queries/0_stateless/01474_executable_dictionary.sql index 727cf47f79f..83f8a946354 100644 --- a/tests/queries/0_stateless/01474_executable_dictionary.sql +++ b/tests/queries/0_stateless/01474_executable_dictionary.sql @@ -1,3 +1,11 @@ SELECT number, dictGet('executable_complex', 'a', (number, number)) AS a, dictGet('executable_complex', 'b', (number, number)) AS b FROM numbers(1000000) WHERE number = 999999; SELECT number, dictGet('executable_complex_direct', 'a', (number, number)) AS a, dictGet('executable_complex_direct', 'b', (number, number)) AS b FROM numbers(1000000) WHERE number = 999999; SELECT number, dictGet('executable_simple', 'a', number) AS a, dictGet('executable_simple', 'b', number) AS b FROM numbers(1000000) WHERE number = 999999; + +SELECT 'Check implicit_key option'; + +SELECT dictGet('simple_executable_cache_dictionary_no_implicit_key', 'value', toUInt64(1)); +SELECT dictGet('simple_executable_cache_dictionary_implicit_key', 'value', toUInt64(1)); + +SELECT dictGet('complex_executable_cache_dictionary_no_implicit_key', 'value', (toUInt64(1), 'FirstKey')); +SELECT dictGet('complex_executable_cache_dictionary_implicit_key', 'value', (toUInt64(1), 'FirstKey'));