From b81ef372d3f4fe84fadf016669a7450f95309816 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 16:29:32 +0300 Subject: [PATCH 1/7] dbms: ColumnVector: add insert(T) function to avoid Field instantiation. [#METR-13298] --- dbms/include/DB/Columns/ColumnVector.h | 5 +++++ .../TypeCheckingBlockInputStream.h | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h diff --git a/dbms/include/DB/Columns/ColumnVector.h b/dbms/include/DB/Columns/ColumnVector.h index c391b4dea3e..078a474be2d 100644 --- a/dbms/include/DB/Columns/ColumnVector.h +++ b/dbms/include/DB/Columns/ColumnVector.h @@ -155,6 +155,11 @@ public: return data.size() * sizeof(data[0]); } + void insert(const T value) + { + data.push_back(value); + } + int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override { return CompareHelper::compare(data[n], static_cast(rhs_).data[m], nan_direction_hint); diff --git a/dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h b/dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h new file mode 100644 index 00000000000..2ef7f3f86b6 --- /dev/null +++ b/dbms/include/DB/Dictionaries/TypeCheckingBlockInputStream.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace DB +{ + +class TypeCheckingBlockInputStream : public IBlockInputStream +{ +public: + TypeCheckingBlockInputStream(const BlockInputStreamPtr & source, Block sample_block) + { + + } + +}; + +} From 33f41657dfefebb43a0aa5ad9712a8fce8433571 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 16:24:56 +0300 Subject: [PATCH 2/7] dbms: improve structure and speed of MySQLBlockInputStream. [#METR-13298] --- .../DB/Dictionaries/MySQLBlockInputStream.h | 100 +++++++++++++----- 1 file changed, 71 insertions(+), 29 deletions(-) diff --git a/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h index d8a14e3c988..d7cea365cf2 100644 --- a/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,13 @@ public: : entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size} { + if (sample_block.columns() != result.getNumFields()) + throw Exception{ + "mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + + toString(sample_block.columns()) + " expected", + ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH + }; + types.reserve(sample_block.columns()); for (const auto idx : ext::range(0, sample_block.columns())) @@ -91,48 +99,82 @@ public: private: Block readImpl() override { + auto row = result.fetch(); + if (!row) + return {}; + auto block = sample_block.cloneEmpty(); - if (block.columns() != result.getNumFields()) - throw Exception{ - "mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + - toString(block.columns()) + " expected", - ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH - }; + /// cache pointers returned by the calls to getByPosition + std::vector columns(block.columns()); + for (const auto i : ext::range(0, columns.size())) + columns[i] = block.getByPosition(i).column.get(); - std::size_t rows = 0; - while (auto row = result.fetch()) + std::size_t num_rows = 0; + while (row) { - /// @todo cache pointers returned by the calls to getByPosition for (const auto idx : ext::range(0, row.size())) - insertValue(block.getByPosition(idx).column, row[idx], types[idx]); + { + const auto value = row[idx]; + if (!value.isNull()) + insertValue(columns[idx], types[idx], value); + else + insertDefaultValue(columns[idx], types[idx]); + } - ++rows; - if (rows == max_block_size) + ++num_rows; + if (num_rows == max_block_size) break; + + row = result.fetch(); } - return rows == 0 ? Block{} : block; - }; + return block; + } - static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const value_type_t type) + static void insertValue(IColumn * const column, const value_type_t type, const mysqlxx::Value & value) { switch (type) { - case value_type_t::UInt8: column->insert(static_cast(value)); break; - case value_type_t::UInt16: column->insert(static_cast(value)); break; - case value_type_t::UInt32: column->insert(static_cast(value)); break; - case value_type_t::UInt64: column->insert(static_cast(value)); break; - case value_type_t::Int8: column->insert(static_cast(value)); break; - case value_type_t::Int16: column->insert(static_cast(value)); break; - case value_type_t::Int32: column->insert(static_cast(value)); break; - case value_type_t::Int64: column->insert(static_cast(value)); break; - case value_type_t::Float32: column->insert(static_cast(value)); break; - case value_type_t::Float64: column->insert(static_cast(value)); break; - case value_type_t::String: column->insert(value.getString()); break; - case value_type_t::Date: column->insert(static_cast(UInt16{value.getDate().getDayNum()})); break; - case value_type_t::DateTime: column->insert(static_cast(time_t{value.getDateTime()})); break; - }; + case value_type_t::UInt8: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::UInt16: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::UInt32: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::UInt64: static_cast(column)->insert(value.getUInt()); break; + case value_type_t::Int8: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Int16: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Int32: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Int64: static_cast(column)->insert(value.getInt()); break; + case value_type_t::Float32: static_cast(column)->insert(value.getDouble()); break; + case value_type_t::Float64: static_cast(column)->insert(value.getDouble()); break; + case value_type_t::String: + { + const auto string = value.getString(); + static_cast(column)->insertDataWithTerminatingZero(string.data(), string.size() + 1); + break; + } + case value_type_t::Date: static_cast(column)->insert(UInt16{value.getDate().getDayNum()}); break; + case value_type_t::DateTime: static_cast(column)->insert(time_t{value.getDateTime()}); break; + } + } + + static void insertDefaultValue(IColumn * const column, const value_type_t type) + { + switch (type) + { + case value_type_t::UInt8: static_cast(column)->insertDefault(); break; + case value_type_t::UInt16: static_cast(column)->insertDefault(); break; + case value_type_t::UInt32: static_cast(column)->insertDefault(); break; + case value_type_t::UInt64: static_cast(column)->insertDefault(); break; + case value_type_t::Int8: static_cast(column)->insertDefault(); break; + case value_type_t::Int16: static_cast(column)->insertDefault(); break; + case value_type_t::Int32: static_cast(column)->insertDefault(); break; + case value_type_t::Int64: static_cast(column)->insertDefault(); break; + case value_type_t::Float32: static_cast(column)->insertDefault(); break; + case value_type_t::Float64: static_cast(column)->insertDefault(); break; + case value_type_t::String: static_cast(column)->insertDefault(); break; + case value_type_t::Date: static_cast(column)->insertDefault(); break; + case value_type_t::DateTime: static_cast(column)->insertDefault(); break; + } } mysqlxx::PoolWithFailover::Entry entry; From f321a422b7eb7777297be3b80c62483c273f7073 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 16:25:34 +0300 Subject: [PATCH 3/7] dbms: FlatDictionary: fix off-by-one error on resize. [#METR-13298] --- dbms/include/DB/Dictionaries/FlatDictionary.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index 756255d1bf6..34420baa099 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -262,7 +262,7 @@ private: { auto & array = *std::get>>(attribute.arrays); if (id >= array.size()) - array.resize_fill(id, std::get(attribute.null_values)); + array.resize_fill(id + 1, std::get(attribute.null_values)); array[id] = value; } @@ -290,7 +290,7 @@ private: { auto & array = *std::get>>(attribute.arrays); if (id >= array.size()) - array.resize_fill(id, std::get(attribute.null_values)); + array.resize_fill(id + 1, std::get(attribute.null_values)); const auto & string = value.get(); const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); array[id] = StringRef{string_in_arena, string.size()}; From 297d3263f262a605180299956b56732c10a6fa36 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 16:26:13 +0300 Subject: [PATCH 4/7] dbms: PODArray::resize_fill(size_t, T): fix erroneous pointer arithmetic --- dbms/include/DB/Common/PODArray.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/include/DB/Common/PODArray.h b/dbms/include/DB/Common/PODArray.h index 222dfa39ec5..78510986b08 100644 --- a/dbms/include/DB/Common/PODArray.h +++ b/dbms/include/DB/Common/PODArray.h @@ -253,7 +253,7 @@ public: if (n > old_size) { reserve(n); - std::fill(t_end(), reinterpret_cast(c_end + n - old_size), value); + std::fill(t_end(), t_end() + n - old_size, value); } c_end = c_start + byte_size(n); } From f673cedaa3d42c84004b9d9d6b100eb78d7a2198 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 16:27:33 +0300 Subject: [PATCH 5/7] dbms: allow node alongside as requested by @zurom. [#METR-13298] --- dbms/src/Interpreters/ExternalDictionaries.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/ExternalDictionaries.cpp b/dbms/src/Interpreters/ExternalDictionaries.cpp index 922ad484e0b..589e2e9496d 100644 --- a/dbms/src/Interpreters/ExternalDictionaries.cpp +++ b/dbms/src/Interpreters/ExternalDictionaries.cpp @@ -132,7 +132,10 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path) if (0 != strncmp(key.data(), "dictionary", strlen("dictionary"))) { - LOG_WARNING(log, config_path << ": unknown node in dictionaries file: '" << key + "', 'dictionary'"); + if (0 != strncmp(key.data(), "comment", strlen("comment"))) + LOG_WARNING(log, + config_path << ": unknown node in dictionaries file: '" << key + "', 'dictionary'"); + continue; } From 006360b75ac6e934b4ffe731036541db79d5d785 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 16:28:08 +0300 Subject: [PATCH 6/7] dbms: dictionaries: more meaningful exception message on null_value parsing error. [#METR-13298] --- dbms/include/DB/Dictionaries/DictionaryStructure.h | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dbms/include/DB/Dictionaries/DictionaryStructure.h b/dbms/include/DB/Dictionaries/DictionaryStructure.h index 6f46010e0d3..5a02bb35928 100644 --- a/dbms/include/DB/Dictionaries/DictionaryStructure.h +++ b/dbms/include/DB/Dictionaries/DictionaryStructure.h @@ -144,8 +144,18 @@ struct DictionaryStructure const auto null_value_string = config.getString(prefix + "null_value"); Field null_value; - ReadBufferFromString null_value_buffer{null_value_string}; - type->deserializeText(null_value, null_value_buffer); + try + { + ReadBufferFromString null_value_buffer{null_value_string}; + type->deserializeText(null_value, null_value_buffer); + } + catch (const std::exception & e) + { + throw Exception{ + std::string{"Error parsing null_value: "} + e.what(), + ErrorCodes::BAD_ARGUMENTS + }; + } const auto hierarchical = config.getBool(prefix + "hierarchical", false); const auto injective = config.getBool(prefix + "injective", false); From cf315e35efaee436cf0e2fc223f30fbc0914959f Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Mon, 23 Mar 2015 17:28:16 +0300 Subject: [PATCH 7/7] dbms: support non-expiring cache lines in CacheDictionary. [#METR-13298] --- dbms/include/DB/Dictionaries/CacheDictionary.h | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h index 939c5fdf6a2..dbaa4118f2c 100644 --- a/dbms/include/DB/Dictionaries/CacheDictionary.h +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -455,7 +455,10 @@ private: } cell.id = id; - cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + else + cell.expires_at = std::chrono::time_point::max(); on_cell_updated(id, cell_idx); remaining_ids[id] = 1; @@ -477,7 +480,10 @@ private: setDefaultAttributeValue(attribute, cell_idx); cell.id = id; - cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) + cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + else + cell.expires_at = std::chrono::time_point::max(); on_cell_updated(id, cell_idx); }