From 93464f52f417cf03af7e91d89c6d3518ac75a0e1 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Fri, 15 Nov 2024 22:33:11 +0800 Subject: [PATCH 1/3] add simd ondemand parser --- base/base/defines.h | 7 + src/Common/ErrorCodes.cpp | 1 + src/Common/JSONParsers/SimdJSONParser.h | 297 +++++++++++++++++++++++- src/Formats/JSONExtractTree.cpp | 14 +- src/Functions/FunctionsJSON.cpp | 27 ++- 5 files changed, 327 insertions(+), 19 deletions(-) diff --git a/base/base/defines.h b/base/base/defines.h index 5685a6d9833..78f5e94ee7a 100644 --- a/base/base/defines.h +++ b/base/base/defines.h @@ -164,3 +164,10 @@ template constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguidelines-missing-std-forward) { } + +#define DB_CONCATENATE_IMPL(s1, s2) s1##s2 +#define DB_CONCATENATE(s1, s2) DB_CONCATENATE_IMPL(s1, s2) + +#define DB_ANONYMOUS_VARIABLE(str) \ + DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__) + diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 3f4a75fae3c..9eed8e6a0bd 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -611,6 +611,7 @@ M(730, REFRESH_FAILED) \ M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \ M(733, TABLE_IS_BEING_RESTARTED) \ + M(734, JSON_PARSE_ERROR) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Common/JSONParsers/SimdJSONParser.h b/src/Common/JSONParsers/SimdJSONParser.h index db679b14f52..412cf2fa0d6 100644 --- a/src/Common/JSONParsers/SimdJSONParser.h +++ b/src/Common/JSONParsers/SimdJSONParser.h @@ -3,6 +3,7 @@ #include "config.h" #if USE_SIMDJSON +# include # include # include # include @@ -18,8 +19,19 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_ALLOCATE_MEMORY; + extern const int JSON_PARSE_ERROR; } +#define SIMDJSON_ASSIGN_OR_THROW_IMPL(_result, _lhs, _rexpr) \ + auto && _result = (_rexpr); \ + if (_result.error() != ::simdjson::SUCCESS) \ + throw DB::ErrnoException(ErrorCodes::JSON_PARSE_ERROR, "simdjson error: {}", std::string(::simdjson::error_message(_result.error()))); \ + _lhs = std::move(_result).value_unsafe() + +#define SIMDJSON_ASSIGN_OR_THROW(_lhs, _rexpr) \ + SIMDJSON_ASSIGN_OR_THROW_IMPL( \ + DB_ANONYMOUS_VARIABLE(_simdjson_sesult), _lhs, _rexpr) + /// Format elements of basic types into string. /// The original implementation is mini_formatter in simdjson.h. But it is not public API, so we /// add a implementation here. @@ -264,13 +276,93 @@ public: format.key(kv.key); append(kv.value); } + + void append(simdjson::ondemand::value value) + { + switch (value.type()) + { + case simdjson::ondemand::json_type::array: + append(value.get_array()); + break; + case simdjson::ondemand::json_type::object: + append(value.get_object()); + break; + case simdjson::ondemand::json_type::number: + { + + simdjson::ondemand::number_type nt{}; + auto res = value.get_number_type().get(nt); + chassert(res == simdjson::SUCCESS); + switch(nt) + { + case simdjson::ondemand::number_type::signed_integer: + format.number(value.get_int64().value_unsafe()); + break; + case simdjson::ondemand::number_type::unsigned_integer: + format.number(value.get_uint64().value_unsafe()); + break; + case simdjson::ondemand::number_type::floating_point_number: + format.number(value.get_double().value_unsafe()); + break; + case simdjson::ondemand::number_type::big_integer: + format.string(value.get_string().value_unsafe()); + break; + } + break; + } + case simdjson::ondemand::json_type::string: + format.string(value.get_string().value_unsafe()); + break; + case simdjson::ondemand::json_type::boolean: + if (value.get_bool().value_unsafe()) + format.trueAtom(); + else + format.falseAtom(); + break; + case simdjson::ondemand::json_type::null: + format.nullAtom(); + break; + } + } + + void append(simdjson::ondemand::array array) + { + format.startArray(); + int i = 0; + for (simdjson::ondemand::value value : array) + { + if (i++ != 0) + format.comma(); + append(value); + } + format.endArray(); + } + + void append(simdjson::ondemand::object object) + { + format.startObject(); + int i = 0; + for (simdjson::ondemand::field field : object) + { + if (i++ != 0) + format.comma(); + append(field); + } + format.endObject(); + } + + void append(simdjson::ondemand::field field) + { + format.key(field.unescaped_key()); + append(field.value()); + } private: SimdJSONBasicFormatter format; }; /// This class can be used as an argument for the template class FunctionJSON. /// It provides ability to parse JSONs using simdjson library. -struct SimdJSONParser +struct DomSimdJSONParser { class Array; class Object; @@ -419,16 +511,215 @@ private: simdjson::dom::parser parser; }; -inline ALWAYS_INLINE SimdJSONParser::Array SimdJSONParser::Element::getArray() const +inline ALWAYS_INLINE DomSimdJSONParser::Array DomSimdJSONParser::Element::getArray() const { return element.get_array().value_unsafe(); } -inline ALWAYS_INLINE SimdJSONParser::Object SimdJSONParser::Element::getObject() const +inline ALWAYS_INLINE DomSimdJSONParser::Object DomSimdJSONParser::Element::getObject() const { return element.get_object().value_unsafe(); } +struct OnDemandSimdJSONParser +{ + class Array; + class Object; + + /// References an element in a JSON document, representing a JSON null, boolean, string, number, + /// array or object. + class Element + { + public: + ALWAYS_INLINE Element() {} /// NOLINT + ALWAYS_INLINE Element(simdjson::ondemand::value && value_) { value = std::move(value_); } + ALWAYS_INLINE Element & operator=(const simdjson::ondemand::value & value_) { value = value_; return *this; } + + ALWAYS_INLINE ElementType type() const + { + if (value.type() == simdjson::ondemand::json_type::object) + return ElementType::OBJECT; + if (value.type() == simdjson::ondemand::json_type::array) + return ElementType::ARRAY; + if (value.type() == simdjson::ondemand::json_type::boolean) + return ElementType::BOOL; + if (value.type() == simdjson::ondemand::json_type::string) + return ElementType::STRING; + if (value.type() == simdjson::ondemand::json_type::number) + { + auto res = value.get_number_type(); + if (res.error()) + return ElementType::NULL_VALUE; + if (res.value() == simdjson::ondemand::number_type::signed_integer) + return ElementType::INT64; + if (res.value() == simdjson::ondemand::number_type::unsigned_integer) + return ElementType::UINT64; + if (res.value() == simdjson::ondemand::number_type::floating_point_number) + return ElementType::DOUBLE; + } + return ElementType::NULL_VALUE; + } + + ALWAYS_INLINE bool isInt64() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::signed_integer; } + ALWAYS_INLINE bool isUInt64() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::unsigned_integer; } + ALWAYS_INLINE bool isDouble() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::floating_point_number; } + ALWAYS_INLINE bool isString() const { auto r = value.type(); return !r.error() && r.value() == simdjson::ondemand::json_type::string; } + ALWAYS_INLINE bool isArray() const + { + auto r = value.type(); + return !r.error() && r.value() == simdjson::ondemand::json_type::array; + } + ALWAYS_INLINE bool isObject() const + { + auto r = value.type(); + return !r.error() && r.value() == simdjson::ondemand::json_type::object; + } + ALWAYS_INLINE bool isBool() const { return value.type() == simdjson::ondemand::json_type::boolean; } + ALWAYS_INLINE bool isNull() const { return value.type() == simdjson::ondemand::json_type::null; } + + ALWAYS_INLINE Int64 getInt64() const { return value.get_int64().value(); } + ALWAYS_INLINE UInt64 getUInt64() const { return value.get_uint64().value(); } + ALWAYS_INLINE double getDouble() const { return value.get_double().value(); } + ALWAYS_INLINE bool getBool() const { return value.get_bool().value(); } + ALWAYS_INLINE std::string_view getString() const + { + auto r = value.get_string(); + if (r.error()) + return {}; + return r.value(); + } + ALWAYS_INLINE Array getArray() const + { + return value.get_array().value(); + } + ALWAYS_INLINE Object getObject() const + { + return value.get_object().value(); + } + + ALWAYS_INLINE simdjson::ondemand::value getElement() const { return value; } + + private: + mutable simdjson::ondemand::value value; + }; + + /// References an array in a JSON document. + class Array + { + public: + class Iterator + { + public: + ALWAYS_INLINE Iterator(const simdjson::ondemand::array_iterator & it_) : it(it_) {} /// NOLINT + ALWAYS_INLINE Element operator*() const { return (*it).value(); } + ALWAYS_INLINE Iterator & operator++() { ++it; return *this; } + ALWAYS_INLINE friend bool operator!=(const Iterator & left, const Iterator & right) { return left.it != right.it; } + ALWAYS_INLINE friend bool operator==(const Iterator & left, const Iterator & right) { return !(left != right); } + private: + mutable simdjson::ondemand::array_iterator it; + }; + + ALWAYS_INLINE Array(const simdjson::ondemand::array & array_) : array(array_) {} /// NOLINT + ALWAYS_INLINE Iterator begin() const { return array.begin().value(); } + ALWAYS_INLINE Iterator end() const { return array.end().value(); } + ALWAYS_INLINE size_t size() const { return array.count_elements().value(); } + ALWAYS_INLINE Element operator[](size_t index) const { return array.at(index).value(); } + + private: + mutable simdjson::ondemand::array array; + }; + + using KeyValuePair = std::pair; + + /// References an object in a JSON document. + class Object + { + public: + class Iterator + { + public: + ALWAYS_INLINE Iterator(const simdjson::ondemand::object_iterator & it_) : it(it_) {} /// NOLINT + + ALWAYS_INLINE KeyValuePair operator*() const + { + SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it); + SIMDJSON_ASSIGN_OR_THROW(std::string_view key, field_wrapper.unescaped_key()); + ::simdjson::ondemand::value v = field_wrapper.value(); + return {key, Element(std::move(v))}; + } + + ALWAYS_INLINE Iterator & operator++() { ++it; return *this; } + ALWAYS_INLINE friend bool operator!=(const Iterator & left, const Iterator & right) { return left.it != right.it; } + ALWAYS_INLINE friend bool operator==(const Iterator & left, const Iterator & right) { return !(left != right); } + private: + mutable simdjson::ondemand::object_iterator it; + }; + + ALWAYS_INLINE Object(const simdjson::ondemand::object & object_) : object(object_) {} /// NOLINT + ALWAYS_INLINE Iterator begin() const { return object.begin().value(); } + ALWAYS_INLINE Iterator end() const { return object.end().value(); } + ///NOTE: call size() before iterate + ALWAYS_INLINE size_t size() const + { + return object.count_fields().value(); + } + + bool find(std::string_view key, Element & result) const + { + auto x = object.find_field_unordered(key); + if (x.error()) + return false; + + result = x.value_unsafe(); + return true; + } + + /// Optional: Provides access to an object's element by index. + KeyValuePair operator[](size_t index) const + { + ///SIMDJSON_ASSIGN_OR_THROW(auto b, object.reset()); + ///(void)b; + SIMDJSON_ASSIGN_OR_THROW(auto it, object.begin()); + while (index--) + { + (void)*(it); /// NEED TO DO THIS TO ITERATE + ++it; + } + SIMDJSON_ASSIGN_OR_THROW(auto field, *it); + std::string_view key = field.unescaped_key().value(); + simdjson::ondemand::value value = field.value(); + return {key, Element(std::move(value))}; + } + + private: + mutable simdjson::ondemand::object object; + }; + + /// Parses a JSON document, returns the reference to its root element if succeeded. + bool parse(std::string_view json, Element & result) + { + padstr = json; + auto res = parser.iterate(padstr); + if (res.error()) + return false; + + document = std::move(res.value()); + auto v = document.get_value(); + if (v.error()) + return false; + + result = v.value(); + return true; + } + +private: + simdjson::ondemand::parser parser; + simdjson::ondemand::document document{}; + simdjson::padded_string padstr; +}; + +using SimdJSONParser = OnDemandSimdJSONParser; + } #endif diff --git a/src/Formats/JSONExtractTree.cpp b/src/Formats/JSONExtractTree.cpp index ae6051823b7..18bfd431419 100644 --- a/src/Formats/JSONExtractTree.cpp +++ b/src/Formats/JSONExtractTree.cpp @@ -1210,9 +1210,9 @@ public: auto array = element.getArray(); auto it = array.begin(); - for (size_t index = 0; (index != nested.size()) && (it != array.end()); ++index) + for (size_t index = 0; (index != nested.size()) && (it != array.end()); ++index, ++it) { - if (nested[index]->insertResultToColumn(tuple.getColumn(index), *it++, insert_settings, format_settings, error)) + if (nested[index]->insertResultToColumn(tuple.getColumn(index), *it, insert_settings, format_settings, error)) { were_valid_elements = true; } @@ -1238,9 +1238,9 @@ public: if (name_to_index_map.empty()) { auto it = object.begin(); - for (size_t index = 0; (index != nested.size()) && (it != object.end()); ++index) + for (size_t index = 0; (index != nested.size()) && (it != object.end()); ++index, ++it) { - if (nested[index]->insertResultToColumn(tuple.getColumn(index), (*it++).second, insert_settings, format_settings, error)) + if (nested[index]->insertResultToColumn(tuple.getColumn(index), (*it).second, insert_settings, format_settings, error)) { were_valid_elements = true; } @@ -1315,18 +1315,18 @@ public: error = fmt::format("cannot read Map value from JSON element: {}", jsonElementToString(element, format_settings)); return false; } - auto & map_col = assert_cast(column); auto & offsets = map_col.getNestedColumn().getOffsets(); auto & tuple_col = map_col.getNestedData(); auto & key_col = tuple_col.getColumn(0); auto & value_col = tuple_col.getColumn(1); size_t old_size = tuple_col.size(); - auto object = element.getObject(); auto it = object.begin(); + size_t object_size{}; for (; it != object.end(); ++it) { + ++object_size; auto pair = *it; /// Insert key @@ -1349,7 +1349,7 @@ public: } } - offsets.push_back(old_size + object.size()); + offsets.push_back(old_size + object_size); return true; } diff --git a/src/Functions/FunctionsJSON.cpp b/src/Functions/FunctionsJSON.cpp index 4106ade6dee..0a14a8ad48e 100644 --- a/src/Functions/FunctionsJSON.cpp +++ b/src/Functions/FunctionsJSON.cpp @@ -273,12 +273,13 @@ private: if (element.isArray()) { auto array = element.getArray(); + size_t array_size = array.size(); if (index >= 0) --index; else - index += array.size(); + index += array_size; - if (static_cast(index) >= array.size()) + if (static_cast(index) >= array_size) return false; element = array[index]; out_key = {}; @@ -290,12 +291,13 @@ private: if (element.isObject()) { auto object = element.getObject(); + size_t object_size = object.size(); if (index >= 0) --index; else - index += object.size(); + index += object_size; - if (static_cast(index) >= object.size()) + if (static_cast(index) >= object_size) return false; std::tie(out_key, element) = object[index]; return true; @@ -621,7 +623,7 @@ public: static bool insertResultToColumn(IColumn & dest, const Element & element, std::string_view, const FormatSettings &, String &) { - size_t size; + size_t size{}; if (element.isArray()) size = element.getArray().size(); else if (element.isObject()) @@ -984,10 +986,13 @@ public: auto array = element.getArray(); ColumnArray & col_res = assert_cast(dest); + size_t size = 0; for (auto value : array) + { + ++size; JSONExtractRawImpl::insertResultToColumn(col_res.getData(), value, {}, format_settings, error); - - col_res.getOffsets().push_back(col_res.getOffsets().back() + array.size()); + } + col_res.getOffsets().push_back(col_res.getOffsets().back() + size); return true; } }; @@ -1020,13 +1025,15 @@ public: auto & col_key = assert_cast(col_tuple.getColumn(0)); auto & col_value = assert_cast(col_tuple.getColumn(1)); + size_t size = 0; for (const auto & [key, value] : object) { col_key.insertData(key.data(), key.size()); JSONExtractRawImpl::insertResultToColumn(col_value, value, {}, format_settings, error); + ++size; } - col_arr.getOffsets().push_back(col_arr.getOffsets().back() + object.size()); + col_arr.getOffsets().push_back(col_arr.getOffsets().back() + size); return true; } }; @@ -1054,12 +1061,14 @@ public: ColumnArray & col_res = assert_cast(dest); auto & col_key = assert_cast(col_res.getData()); + size_t count = 0; for (const auto & [key, value] : object) { + ++count; col_key.insertData(key.data(), key.size()); } - col_res.getOffsets().push_back(col_res.getOffsets().back() + object.size()); + col_res.getOffsets().push_back(col_res.getOffsets().back() + count); return true; } }; From 7f6dcb854b788a20a9f6900f245033d7ff345923 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Wed, 20 Nov 2024 15:06:09 +0800 Subject: [PATCH 2/3] add batch visitor --- base/base/defines.h | 2 +- src/Common/JSONParsers/DummyJSONParser.h | 2 +- src/Common/JSONParsers/SimdJSONParser.h | 49 +++++++++++-- src/Functions/FunctionSQLJSON.h | 24 +++--- .../JSONPath/Generator/GeneratorJSONPath.h | 32 ++++++-- src/Functions/JSONPath/Generator/IGenerator.h | 6 +- src/Functions/JSONPath/Generator/IVisitor.h | 6 +- .../Generator/VisitorJSONPathMemberAccess.h | 37 +++++++--- .../JSONPath/Generator/VisitorJSONPathRange.h | 73 +++++++++++++++++-- .../JSONPath/Generator/VisitorJSONPathRoot.h | 13 +++- .../JSONPath/Generator/VisitorJSONPathStar.h | 61 ++++++++++++++-- 11 files changed, 258 insertions(+), 47 deletions(-) diff --git a/base/base/defines.h b/base/base/defines.h index 78f5e94ee7a..6ef7528688c 100644 --- a/base/base/defines.h +++ b/base/base/defines.h @@ -169,5 +169,5 @@ constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguideli #define DB_CONCATENATE(s1, s2) DB_CONCATENATE_IMPL(s1, s2) #define DB_ANONYMOUS_VARIABLE(str) \ - DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__) + DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__) diff --git a/src/Common/JSONParsers/DummyJSONParser.h b/src/Common/JSONParsers/DummyJSONParser.h index 394a2817ea1..12432e2ecad 100644 --- a/src/Common/JSONParsers/DummyJSONParser.h +++ b/src/Common/JSONParsers/DummyJSONParser.h @@ -87,7 +87,7 @@ struct DummyJSONParser static Iterator end() { return {}; } static size_t size() { return 0; } bool find(std::string_view, Element &) const { return false; } /// NOLINT - + bool reset() { return true; } #if 0 /// Optional: Provides access to an object's element by index. KeyValuePair operator[](size_t) const { return {}; } diff --git a/src/Common/JSONParsers/SimdJSONParser.h b/src/Common/JSONParsers/SimdJSONParser.h index 412cf2fa0d6..70306cbe57e 100644 --- a/src/Common/JSONParsers/SimdJSONParser.h +++ b/src/Common/JSONParsers/SimdJSONParser.h @@ -1,5 +1,6 @@ #pragma once +#include "Common/StackTrace.h" #include "config.h" #if USE_SIMDJSON @@ -590,11 +591,13 @@ struct OnDemandSimdJSONParser } ALWAYS_INLINE Array getArray() const { - return value.get_array().value(); + SIMDJSON_ASSIGN_OR_THROW(auto arr, value.get_array()); + return arr; } ALWAYS_INLINE Object getObject() const { - return value.get_object().value(); + SIMDJSON_ASSIGN_OR_THROW(auto obj, value.get_object()); + return obj; } ALWAYS_INLINE simdjson::ondemand::value getElement() const { return value; } @@ -610,6 +613,7 @@ struct OnDemandSimdJSONParser class Iterator { public: + Iterator() = default; ALWAYS_INLINE Iterator(const simdjson::ondemand::array_iterator & it_) : it(it_) {} /// NOLINT ALWAYS_INLINE Element operator*() const { return (*it).value(); } ALWAYS_INLINE Iterator & operator++() { ++it; return *this; } @@ -623,9 +627,26 @@ struct OnDemandSimdJSONParser ALWAYS_INLINE Iterator begin() const { return array.begin().value(); } ALWAYS_INLINE Iterator end() const { return array.end().value(); } ALWAYS_INLINE size_t size() const { return array.count_elements().value(); } - ALWAYS_INLINE Element operator[](size_t index) const { return array.at(index).value(); } + ALWAYS_INLINE Element operator[](size_t index) const + { + if (index < last_index) + array.reset(); + if (last_index == 0) + { + SIMDJSON_ASSIGN_OR_THROW(auto iter, array.begin()); + it = iter; + } + size_t diff = index - last_index; + while (diff--) + ++it; + last_index = index; + SIMDJSON_ASSIGN_OR_THROW(auto ele, *it); + return ele; + } private: + mutable size_t last_index{}; + mutable simdjson::ondemand::array_iterator it; mutable simdjson::ondemand::array array; }; @@ -642,8 +663,18 @@ struct OnDemandSimdJSONParser ALWAYS_INLINE KeyValuePair operator*() const { - SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it); - SIMDJSON_ASSIGN_OR_THROW(std::string_view key, field_wrapper.unescaped_key()); + //SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it); + auto field_wrapper = *it; + if (field_wrapper.error()) + { + return {}; + } + std::string_view key; + auto key_error = field_wrapper.unescaped_key().get(key); + if (key_error) + { + return {}; + } ::simdjson::ondemand::value v = field_wrapper.value(); return {key, Element(std::move(v))}; } @@ -674,6 +705,14 @@ struct OnDemandSimdJSONParser return true; } + bool reset() + { + auto v = object.reset(); + if (v.error()) + return false; + return true; + } + /// Optional: Provides access to an object's element by index. KeyValuePair operator[](size_t index) const { diff --git a/src/Functions/FunctionSQLJSON.h b/src/Functions/FunctionSQLJSON.h index d56e655e126..300c6152b4d 100644 --- a/src/Functions/FunctionSQLJSON.h +++ b/src/Functions/FunctionSQLJSON.h @@ -102,8 +102,12 @@ public: } /// serialize the json element into column's buffer directly - void addElement(const Element & element) + void addElement(const Element & element, const std::string sep = ", ") { + if (is_first) + is_first = false; + else + addRawData(sep.data(), sep.size()); formatter.append(element.getElement()); } void commit() @@ -121,7 +125,7 @@ private: IColumn::Offsets & offsets; Formatter formatter; size_t prev_offset; - + bool is_first{true}; }; @@ -364,7 +368,7 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - current_element = root; + //current_element = root; } if (status == VisitorStatus::Exhausted) @@ -416,19 +420,16 @@ public: bool success = false; const char * array_begin = "["; const char * array_end = "]"; - const char * comma = ", "; + //const char * comma = ", "; JSONStringSerializer json_serializer(col_str); json_serializer.addRawData(array_begin, 1); - while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted) + //std::vector result{}; + std::function result_func= [&json_serializer](const Element & element) { json_serializer.addElement(element); }; + while ((status = generator_json_path.getNextItemBatch(current_element, result_func)) != VisitorStatus::Exhausted) { if (status == VisitorStatus::Ok) { - if (success) - { - json_serializer.addRawData(comma, 2); - } success = true; - json_serializer.addElement(current_element); } else if (status == VisitorStatus::Error) { @@ -436,7 +437,8 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - current_element = root; + //current_element = root; + //result.clear(); } if (!success) { diff --git a/src/Functions/JSONPath/Generator/GeneratorJSONPath.h b/src/Functions/JSONPath/Generator/GeneratorJSONPath.h index 1016d776be0..02ef44a3adc 100644 --- a/src/Functions/JSONPath/Generator/GeneratorJSONPath.h +++ b/src/Functions/JSONPath/Generator/GeneratorJSONPath.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -21,6 +22,7 @@ template class GeneratorJSONPath : public IGenerator { public: + using TElement = typename JSONParser::Element; /** * Traverses children ASTs of ASTJSONPathQuery and creates a vector of corresponding visitors * @param query_ptr_ pointer to ASTJSONPathQuery @@ -80,11 +82,6 @@ public: return VisitorStatus::Exhausted; } - for (int i = 0; i < current_visitor; ++i) - { - visitors[i]->apply(current); - } - VisitorStatus status = VisitorStatus::Error; for (size_t i = current_visitor; i < visitors.size(); ++i) { @@ -105,6 +102,31 @@ public: } } + VisitorStatus getNextItemBatch(TElement & element, std::function & res_func) override + { + while (true) + { + /// element passed to us actually is root, so here we assign current to root + auto current = element; + if (current_visitor < 0) + return VisitorStatus::Exhausted; + + VisitorStatus status = VisitorStatus::Error; + size_t visitor_size = visitors.size(); + for (size_t i = current_visitor; i < visitor_size; ++i) + { + status = visitors[i]->visitBatch(current, res_func, i == visitor_size - 1); + current_visitor = static_cast(i); + if (status == VisitorStatus::Error || status == VisitorStatus::Ignore) + break; + } + updateVisitorsForNextRun(); + + if (status != VisitorStatus::Ignore) + return status; + } + } + void reinitialize() { while (current_visitor >= 0) diff --git a/src/Functions/JSONPath/Generator/IGenerator.h b/src/Functions/JSONPath/Generator/IGenerator.h index d3a6c32de72..a91c940316d 100644 --- a/src/Functions/JSONPath/Generator/IGenerator.h +++ b/src/Functions/JSONPath/Generator/IGenerator.h @@ -10,6 +10,8 @@ template class IGenerator { public: + using TElement = typename JSONParser::Element; + IGenerator() = default; virtual const char * getName() const = 0; @@ -20,7 +22,9 @@ public: * @param element to be extracted into * @return true if generator is not exhausted */ - virtual VisitorStatus getNextItem(typename JSONParser::Element & element) = 0; + virtual VisitorStatus getNextItem(TElement & element) = 0; + + virtual VisitorStatus getNextItemBatch(TElement & element, std::function & ) = 0; virtual ~IGenerator() = default; }; diff --git a/src/Functions/JSONPath/Generator/IVisitor.h b/src/Functions/JSONPath/Generator/IVisitor.h index 1a94106a435..1a39a09bad6 100644 --- a/src/Functions/JSONPath/Generator/IVisitor.h +++ b/src/Functions/JSONPath/Generator/IVisitor.h @@ -8,6 +8,7 @@ template class IVisitor { public: + using TElement = typename JSONParser::Element; virtual const char * getName() const = 0; /** @@ -15,12 +16,13 @@ public: * @param element simdjson element */ virtual VisitorStatus visit(typename JSONParser::Element & element) = 0; + virtual VisitorStatus visitBatch(TElement & element, std::function & , bool ) = 0; - /** + /** * Applies this visitor to document, but does not mutate state * @param element simdjson element */ - virtual VisitorStatus apply(typename JSONParser::Element & element) const = 0; + virtual VisitorStatus apply(typename JSONParser::Element & element) = 0; /** * Restores visitor's initial state for later use diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h index 8446e1ff3be..efbd3d9fba6 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -10,33 +11,51 @@ template class VisitorJSONPathMemberAccess : public IVisitor { public: + using TElement = JSONParser::Element; explicit VisitorJSONPathMemberAccess(ASTPtr member_access_ptr_) : member_access_ptr(member_access_ptr_->as()) { } const char * getName() const override { return "VisitorJSONPathMemberAccess"; } - VisitorStatus apply(typename JSONParser::Element & element) const override + VisitorStatus apply(typename JSONParser::Element & element) override { typename JSONParser::Element result; - element.getObject().find(std::string_view(member_access_ptr->member_name), result); - element = result; + auto obj = element.getObject(); + if (!obj.find(std::string_view(member_access_ptr->member_name), result)) + { + return VisitorStatus::Error; + } + element = std::move(result); return VisitorStatus::Ok; } VisitorStatus visit(typename JSONParser::Element & element) override { + if (this->isExhausted()) + { + return VisitorStatus::Exhausted; + } this->setExhausted(true); if (!element.isObject()) { return VisitorStatus::Error; } - typename JSONParser::Element result; - if (!element.getObject().find(std::string_view(member_access_ptr->member_name), result)) - { + return apply(element); + } + + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (this->isExhausted()) + return VisitorStatus::Exhausted; + this->setExhausted(true); + if (!element.isObject()) return VisitorStatus::Error; - } - apply(element); - return VisitorStatus::Ok; + + auto status = apply(element); + if (status == VisitorStatus::Ok && can_reduce) + res_func(element); + + return status; } void reinitialize() override { this->setExhausted(false); } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h index 708a71f7cf4..7cb429d59bb 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -18,23 +19,80 @@ public: const char * getName() const override { return "VisitorJSONPathRange"; } - VisitorStatus apply(typename JSONParser::Element & element) const override + VisitorStatus apply(typename JSONParser::Element & element) override { - typename JSONParser::Array array = element.getArray(); - element = array[current_index]; + element = (*array)[current_index]; return VisitorStatus::Ok; } + using TElement = JSONParser::Element; + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (!array && !element.isArray()) + { + this->setExhausted(true); + return VisitorStatus::Error; + } + + VisitorStatus status = VisitorStatus::Ok; + if (!array) + { + current_element = element; + array = current_element.getArray(); + if (!can_reduce) + array_size = array.value().size(); + } + + if (can_reduce) + { + std::set index_set{}; + for (auto range: range_ptr->ranges) + for (size_t i = range.first ; i < range.second; ++i) + index_set.insert(i); + + size_t idx = 0; + for (auto item: array.value()) + if (index_set.find(idx++) != index_set.end()) + res_func(item); + + this->setExhausted(true); + status = VisitorStatus::Ok; + } + else + { + if (current_index < array_size.value()) + { + apply(element); + status = VisitorStatus::Ok; + } + else + status = VisitorStatus::Ignore; + + if (current_index + 1 == range_ptr->ranges[current_range].second + && current_range + 1 == range_ptr->ranges.size()) + { + this->setExhausted(true); + } + } + + return status; + } VisitorStatus visit(typename JSONParser::Element & element) override { - if (!element.isArray()) + if (!array && !element.isArray()) { this->setExhausted(true); return VisitorStatus::Error; } VisitorStatus status; - if (current_index < element.getArray().size()) + if (!array) + { + current_element = element; + array = current_element.getArray(); + array_size = array.value().size(); + } + if (current_index < array_size.value()) { apply(element); status = VisitorStatus::Ok; @@ -58,6 +116,8 @@ public: current_range = 0; current_index = range_ptr->ranges[current_range].first; this->setExhausted(false); + array_size.reset(); + array.reset(); } void updateState() override @@ -74,6 +134,9 @@ private: ASTJSONPathRange * range_ptr; size_t current_range; UInt32 current_index; + std::optional array_size{}; + std::optional array{}; + typename JSONParser::Element current_element{}; }; } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h index 71569d3c0a0..6f980073030 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h @@ -10,11 +10,12 @@ template class VisitorJSONPathRoot : public IVisitor { public: + using TElement = JSONParser::Element; explicit VisitorJSONPathRoot(ASTPtr) { } const char * getName() const override { return "VisitorJSONPathRoot"; } - VisitorStatus apply(typename JSONParser::Element & /*element*/) const override + VisitorStatus apply(typename JSONParser::Element & /*element*/) override { /// No-op on document, since we are already passed document's root return VisitorStatus::Ok; @@ -27,9 +28,19 @@ public: return VisitorStatus::Ok; } + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + apply(element); + this->setExhausted(true); + if (can_reduce) + res_func(element); + return VisitorStatus::Ok; + } + void reinitialize() override { this->setExhausted(false); } void updateState() override { } + }; } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h index 0c297f64316..d36a323abe7 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -17,23 +18,27 @@ public: const char * getName() const override { return "VisitorJSONPathStar"; } - VisitorStatus apply(typename JSONParser::Element & element) const override + VisitorStatus apply(typename JSONParser::Element & element) override { - typename JSONParser::Array array = element.getArray(); - element = array[current_index]; + element = array.value()[current_index]; return VisitorStatus::Ok; } VisitorStatus visit(typename JSONParser::Element & element) override { - if (!element.isArray()) + if (!array && !element.isArray()) { this->setExhausted(true); return VisitorStatus::Error; } + if (!array_size) + { + array = element.getArray(); + array_size = array.value().size(); + } VisitorStatus status; - if (current_index < element.getArray().size()) + if (current_index < array_size.value()) { apply(element); status = VisitorStatus::Ok; @@ -46,11 +51,53 @@ public: return status; } + using TElement = JSONParser::Element; + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (!array && !element.isArray()) + { + this->setExhausted(true); + return VisitorStatus::Error; + } + + if (!array) + { + array = element.getArray(); + array_size = array.value().size(); + } + VisitorStatus status = VisitorStatus::Ok; + + if (can_reduce) + { + for (auto item: array.value()) + res_func(item); + + this->setExhausted(true); + } + else + { + if (current_index < array_size.value()) + { + apply(element); + status = VisitorStatus::Ok; + } + else + { + status = VisitorStatus::Ignore; + this->setExhausted(true); + } + + } + + return status; + } void reinitialize() override { current_index = 0; this->setExhausted(false); + array_size.reset(); + array.reset(); } void updateState() override @@ -59,7 +106,9 @@ public: } private: - UInt32 current_index; + UInt32 current_index{}; + std::optional array{}; + std::optional array_size{}; }; } From 2b39f6c019294d9bce1443a7dd0592d8ed0472c0 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Wed, 20 Nov 2024 16:40:09 +0800 Subject: [PATCH 3/3] fix style --- src/Common/ErrorCodes.cpp | 1 + src/Common/JSONParsers/SimdJSONParser.h | 7 +------ src/Formats/JSONExtractTree.cpp | 2 +- src/Functions/FunctionSQLJSON.h | 5 ----- src/Functions/FunctionsJSON.cpp | 2 +- src/Functions/JSONPath/Generator/IGenerator.h | 2 +- src/Functions/JSONPath/Generator/IVisitor.h | 13 +++++++++---- .../Generator/VisitorJSONPathMemberAccess.h | 2 +- .../JSONPath/Generator/VisitorJSONPathRange.h | 2 +- .../JSONPath/Generator/VisitorJSONPathRoot.h | 2 +- .../JSONPath/Generator/VisitorJSONPathStar.h | 2 +- 11 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 4471484a4af..f057dc2995f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -614,6 +614,7 @@ M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \ M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \ M(736, JSON_PARSE_ERROR) \ +\ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ M(902, PROTOCOL_VERSION_MISMATCH) \ diff --git a/src/Common/JSONParsers/SimdJSONParser.h b/src/Common/JSONParsers/SimdJSONParser.h index 70306cbe57e..d54fd11c10e 100644 --- a/src/Common/JSONParsers/SimdJSONParser.h +++ b/src/Common/JSONParsers/SimdJSONParser.h @@ -1,10 +1,8 @@ #pragma once -#include "Common/StackTrace.h" #include "config.h" #if USE_SIMDJSON -# include # include # include # include @@ -294,7 +292,7 @@ public: simdjson::ondemand::number_type nt{}; auto res = value.get_number_type().get(nt); chassert(res == simdjson::SUCCESS); - switch(nt) + switch (nt) { case simdjson::ondemand::number_type::signed_integer: format.number(value.get_int64().value_unsafe()); @@ -663,7 +661,6 @@ struct OnDemandSimdJSONParser ALWAYS_INLINE KeyValuePair operator*() const { - //SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it); auto field_wrapper = *it; if (field_wrapper.error()) { @@ -716,8 +713,6 @@ struct OnDemandSimdJSONParser /// Optional: Provides access to an object's element by index. KeyValuePair operator[](size_t index) const { - ///SIMDJSON_ASSIGN_OR_THROW(auto b, object.reset()); - ///(void)b; SIMDJSON_ASSIGN_OR_THROW(auto it, object.begin()); while (index--) { diff --git a/src/Formats/JSONExtractTree.cpp b/src/Formats/JSONExtractTree.cpp index 17b59e80d6f..b8b70ce3d07 100644 --- a/src/Formats/JSONExtractTree.cpp +++ b/src/Formats/JSONExtractTree.cpp @@ -1324,7 +1324,7 @@ public: size_t old_size = tuple_col.size(); auto object = element.getObject(); auto it = object.begin(); - size_t object_size{}; + size_t object_size = 0; for (; it != object.end(); ++it) { ++object_size; diff --git a/src/Functions/FunctionSQLJSON.h b/src/Functions/FunctionSQLJSON.h index 300c6152b4d..58e18df23b0 100644 --- a/src/Functions/FunctionSQLJSON.h +++ b/src/Functions/FunctionSQLJSON.h @@ -368,7 +368,6 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - //current_element = root; } if (status == VisitorStatus::Exhausted) @@ -420,10 +419,8 @@ public: bool success = false; const char * array_begin = "["; const char * array_end = "]"; - //const char * comma = ", "; JSONStringSerializer json_serializer(col_str); json_serializer.addRawData(array_begin, 1); - //std::vector result{}; std::function result_func= [&json_serializer](const Element & element) { json_serializer.addElement(element); }; while ((status = generator_json_path.getNextItemBatch(current_element, result_func)) != VisitorStatus::Exhausted) { @@ -437,8 +434,6 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - //current_element = root; - //result.clear(); } if (!success) { diff --git a/src/Functions/FunctionsJSON.cpp b/src/Functions/FunctionsJSON.cpp index d2457784d00..e1815515ff9 100644 --- a/src/Functions/FunctionsJSON.cpp +++ b/src/Functions/FunctionsJSON.cpp @@ -623,7 +623,7 @@ public: static bool insertResultToColumn(IColumn & dest, const Element & element, std::string_view, const FormatSettings &, String &) { - size_t size{}; + size_t size; if (element.isArray()) size = element.getArray().size(); else if (element.isObject()) diff --git a/src/Functions/JSONPath/Generator/IGenerator.h b/src/Functions/JSONPath/Generator/IGenerator.h index a91c940316d..ffb44d8ad42 100644 --- a/src/Functions/JSONPath/Generator/IGenerator.h +++ b/src/Functions/JSONPath/Generator/IGenerator.h @@ -24,7 +24,7 @@ public: */ virtual VisitorStatus getNextItem(TElement & element) = 0; - virtual VisitorStatus getNextItemBatch(TElement & element, std::function & ) = 0; + virtual VisitorStatus getNextItemBatch(TElement & element, std::function & res_func) = 0; virtual ~IGenerator() = default; }; diff --git a/src/Functions/JSONPath/Generator/IVisitor.h b/src/Functions/JSONPath/Generator/IVisitor.h index 1a39a09bad6..72c3196efd5 100644 --- a/src/Functions/JSONPath/Generator/IVisitor.h +++ b/src/Functions/JSONPath/Generator/IVisitor.h @@ -15,14 +15,19 @@ public: * Applies this visitor to document and mutates its state * @param element simdjson element */ - virtual VisitorStatus visit(typename JSONParser::Element & element) = 0; - virtual VisitorStatus visitBatch(TElement & element, std::function & , bool ) = 0; + virtual VisitorStatus visit(TElement & element) = 0; - /** + /** + * Applies this visitor to document and mutates its state, returning a batch of results + * @param element simdjson element + */ + virtual VisitorStatus visitBatch(TElement &element, std::function & res_func, bool can_reduce) = 0; + + /** * Applies this visitor to document, but does not mutate state * @param element simdjson element */ - virtual VisitorStatus apply(typename JSONParser::Element & element) = 0; + virtual VisitorStatus apply(typename JSONParser::Element & element) const = 0; /** * Restores visitor's initial state for later use diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h index efbd3d9fba6..ec36a67aba1 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h @@ -17,7 +17,7 @@ public: const char * getName() const override { return "VisitorJSONPathMemberAccess"; } - VisitorStatus apply(typename JSONParser::Element & element) override + VisitorStatus apply(typename JSONParser::Element & element) const override { typename JSONParser::Element result; auto obj = element.getObject(); diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h index 7cb429d59bb..09763fe0b5b 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h @@ -19,7 +19,7 @@ public: const char * getName() const override { return "VisitorJSONPathRange"; } - VisitorStatus apply(typename JSONParser::Element & element) override + VisitorStatus apply(typename JSONParser::Element & element) const override { element = (*array)[current_index]; return VisitorStatus::Ok; diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h index 6f980073030..cc9001110db 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h @@ -15,7 +15,7 @@ public: const char * getName() const override { return "VisitorJSONPathRoot"; } - VisitorStatus apply(typename JSONParser::Element & /*element*/) override + VisitorStatus apply(typename JSONParser::Element & /*element*/) const override { /// No-op on document, since we are already passed document's root return VisitorStatus::Ok; diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h index d36a323abe7..51b499bd892 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h @@ -18,7 +18,7 @@ public: const char * getName() const override { return "VisitorJSONPathStar"; } - VisitorStatus apply(typename JSONParser::Element & element) override + VisitorStatus apply(typename JSONParser::Element & element) const override { element = array.value()[current_index]; return VisitorStatus::Ok;