diff --git a/dbms/include/DB/Columns/ColumnVector.h b/dbms/include/DB/Columns/ColumnVector.h index c391b4dea3e..078a474be2d 100644 --- a/dbms/include/DB/Columns/ColumnVector.h +++ b/dbms/include/DB/Columns/ColumnVector.h @@ -155,6 +155,11 @@ public: return data.size() * sizeof(data[0]); } + void insert(const T value) + { + data.push_back(value); + } + int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override { return CompareHelper::compare(data[n], static_cast(rhs_).data[m], nan_direction_hint); diff --git a/dbms/include/DB/Common/PODArray.h b/dbms/include/DB/Common/PODArray.h index 222dfa39ec5..78510986b08 100644 --- a/dbms/include/DB/Common/PODArray.h +++ b/dbms/include/DB/Common/PODArray.h @@ -253,7 +253,7 @@ public: if (n > old_size) { reserve(n); - std::fill(t_end(), reinterpret_cast(c_end + n - old_size), value); + std::fill(t_end(), t_end() + n - old_size, value); } c_end = c_start + byte_size(n); } diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h index 939c5fdf6a2..dbaa4118f2c 100644 --- a/dbms/include/DB/Dictionaries/CacheDictionary.h +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -455,7 +455,10 @@ private: } cell.id = id; - cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + else + cell.expires_at = std::chrono::time_point::max(); on_cell_updated(id, cell_idx); remaining_ids[id] = 1; @@ -477,7 +480,10 @@ private: setDefaultAttributeValue(attribute, cell_idx); cell.id = id; - cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + else + cell.expires_at = std::chrono::time_point::max(); on_cell_updated(id, cell_idx); } diff --git a/dbms/include/DB/Dictionaries/DictionaryStructure.h b/dbms/include/DB/Dictionaries/DictionaryStructure.h index 6f46010e0d3..5a02bb35928 100644 --- a/dbms/include/DB/Dictionaries/DictionaryStructure.h +++ b/dbms/include/DB/Dictionaries/DictionaryStructure.h @@ -144,8 +144,18 @@ struct DictionaryStructure const auto null_value_string = config.getString(prefix + "null_value"); Field null_value; - ReadBufferFromString null_value_buffer{null_value_string}; - type->deserializeText(null_value, null_value_buffer); + try + { + ReadBufferFromString null_value_buffer{null_value_string}; + type->deserializeText(null_value, null_value_buffer); + } + catch (const std::exception & e) + { + throw Exception{ + std::string{"Error parsing null_value: "} + e.what(), + ErrorCodes::BAD_ARGUMENTS + }; + } const auto hierarchical = config.getBool(prefix + "hierarchical", false); const auto injective = config.getBool(prefix + "injective", false); diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index 756255d1bf6..34420baa099 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -262,7 +262,7 @@ private: { auto & array = *std::get>>(attribute.arrays); if (id >= array.size()) - array.resize_fill(id, std::get(attribute.null_values)); + array.resize_fill(id + 1, std::get(attribute.null_values)); array[id] = value; } @@ -290,7 +290,7 @@ private: { auto & array = *std::get>>(attribute.arrays); if (id >= array.size()) - array.resize_fill(id, std::get(attribute.null_values)); + array.resize_fill(id + 1, std::get(attribute.null_values)); const auto & string = value.get(); const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); array[id] = StringRef{string_in_arena, string.size()}; diff --git a/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h index d8a14e3c988..d7cea365cf2 100644 --- a/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,13 @@ public: : entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size} { + if (sample_block.columns() != result.getNumFields()) + throw Exception{ + "mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + + toString(sample_block.columns()) + " expected", + ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH + }; + types.reserve(sample_block.columns()); for (const auto idx : ext::range(0, sample_block.columns())) @@ -91,48 +99,82 @@ public: private: Block readImpl() override { + auto row = result.fetch(); + if (!row) + return {}; + auto block = sample_block.cloneEmpty(); - if (block.columns() != result.getNumFields()) - throw Exception{ - "mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + - toString(block.columns()) + " expected", - ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH - }; + /// cache pointers returned by the calls to getByPosition + std::vector columns(block.columns()); + for (const auto i : ext::range(0, columns.size())) + columns[i] = block.getByPosition(i).column.get(); - std::size_t rows = 0; - while (auto row = result.fetch()) + std::size_t num_rows = 0; + while (row) { - /// @todo cache pointers returned by the calls to getByPosition for (const auto idx : ext::range(0, row.size())) - insertValue(block.getByPosition(idx).column, row[idx], types[idx]); + { + const auto value = row[idx]; + if (!value.isNull()) + insertValue(columns[idx], types[idx], value); + else + insertDefaultValue(columns[idx], types[idx]); + } - ++rows; - if (rows == max_block_size) + ++num_rows; + if (num_rows == max_block_size) break; + + row = result.fetch(); } - return rows == 0 ? Block{} : block; - }; + return block; + } - static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const value_type_t type) + static void insertValue(IColumn * const column, const value_type_t type, const mysqlxx::Value & value) { switch (type) { - case value_type_t::UInt8: column->insert(static_cast(value)); break; - case value_type_t::UInt16: column->insert(static_cast(value)); break; - case value_type_t::UInt32: column->insert(static_cast(value)); break; - case value_type_t::UInt64: column->insert(static_cast(value)); break; - case value_type_t::Int8: column->insert(static_cast(value)); break; - case value_type_t::Int16: column->insert(static_cast(value)); break; - case value_type_t::Int32: column->insert(static_cast(value)); break; - case value_type_t::Int64: column->insert(static_cast(value)); break; - case value_type_t::Float32: column->insert(static_cast(value)); break; - case value_type_t::Float64: column->insert(static_cast(value)); break; - case value_type_t::String: column->insert(value.getString()); break; - case value_type_t::Date: column->insert(static_cast(UInt16{value.getDate().getDayNum()})); break; - case value_type_t::DateTime: column->insert(static_cast(time_t{value.getDateTime()})); break; - }; + case value_type_t::UInt8: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::UInt16: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::UInt32: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::UInt64: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::Int8: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Int16: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Int32: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Int64: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Float32: static_cast(column)->insert(value.getDouble()); break; + case value_type_t::Float64: static_cast(column)->insert(value.getDouble()); break; + case value_type_t::String: + { + const auto string = value.getString(); + static_cast(column)->insertDataWithTerminatingZero(string.data(), string.size() + 1); + break; + } + case value_type_t::Date: static_cast(column)->insert(UInt16{value.getDate().getDayNum()}); break; + case value_type_t::DateTime: static_cast(column)->insert(time_t{value.getDateTime()}); break; + } + } + + static void insertDefaultValue(IColumn * const column, const value_type_t type) + { + switch (type) + { + case value_type_t::UInt8: static_cast(column)->insertDefault(); break; + case value_type_t::UInt16: static_cast(column)->insertDefault(); break; + case value_type_t::UInt32: static_cast(column)->insertDefault(); break; + case value_type_t::UInt64: static_cast(column)->insertDefault(); break; + case value_type_t::Int8: static_cast(column)->insertDefault(); break; + case value_type_t::Int16: static_cast(column)->insertDefault(); break; + case value_type_t::Int32: static_cast(column)->insertDefault(); break; + case value_type_t::Int64: static_cast(column)->insertDefault(); break; + case value_type_t::Float32: static_cast(column)->insertDefault(); break; + case value_type_t::Float64: static_cast(column)->insertDefault(); break; + case value_type_t::String: static_cast(column)->insertDefault(); break; + case value_type_t::Date: static_cast(column)->insertDefault(); break; + case value_type_t::DateTime: static_cast(column)->insertDefault(); break; + } } mysqlxx::PoolWithFailover::Entry entry; diff --git a/dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h b/dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h new file mode 100644 index 00000000000..2ef7f3f86b6 --- /dev/null +++ b/dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace DB +{ + +class TypeCheckingBlockInputStream : public IBlockInputStream +{ +public: + TypeCheckingBlockInputStream(const BlockInputStreamPtr & source, Block sample_block) + { + + } + +}; + +} diff --git a/dbms/src/Interpreters/ExternalDictionaries.cpp b/dbms/src/Interpreters/ExternalDictionaries.cpp index 922ad484e0b..589e2e9496d 100644 --- a/dbms/src/Interpreters/ExternalDictionaries.cpp +++ b/dbms/src/Interpreters/ExternalDictionaries.cpp @@ -132,7 +132,10 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path) if (0 != strncmp(key.data(), "dictionary", strlen("dictionary"))) { - LOG_WARNING(log, config_path << ": unknown node in dictionaries file: '" << key + "', 'dictionary'"); + if (0 != strncmp(key.data(), "comment", strlen("comment"))) + LOG_WARNING(log, + config_path << ": unknown node in dictionaries file: '" << key + "', 'dictionary'"); + continue; }