diff --git a/base/base/defines.h b/base/base/defines.h index a0c3c0d1de5..c4bd0b2ee5a 100644 --- a/base/base/defines.h +++ b/base/base/defines.h @@ -165,3 +165,10 @@ template constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguidelines-missing-std-forward) { } + +#define DB_CONCATENATE_IMPL(s1, s2) s1##s2 +#define DB_CONCATENATE(s1, s2) DB_CONCATENATE_IMPL(s1, s2) + +#define DB_ANONYMOUS_VARIABLE(str) \ + DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__) + diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 376ccf6f297..f057dc2995f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -613,6 +613,7 @@ M(733, TABLE_IS_BEING_RESTARTED) \ M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \ M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \ + M(736, JSON_PARSE_ERROR) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Common/JSONParsers/DummyJSONParser.h b/src/Common/JSONParsers/DummyJSONParser.h index 394a2817ea1..12432e2ecad 100644 --- a/src/Common/JSONParsers/DummyJSONParser.h +++ b/src/Common/JSONParsers/DummyJSONParser.h @@ -87,7 +87,7 @@ struct DummyJSONParser static Iterator end() { return {}; } static size_t size() { return 0; } bool find(std::string_view, Element &) const { return false; } /// NOLINT - + bool reset() { return true; } #if 0 /// Optional: Provides access to an object's element by index. KeyValuePair operator[](size_t) const { return {}; } diff --git a/src/Common/JSONParsers/SimdJSONParser.h b/src/Common/JSONParsers/SimdJSONParser.h index db679b14f52..d54fd11c10e 100644 --- a/src/Common/JSONParsers/SimdJSONParser.h +++ b/src/Common/JSONParsers/SimdJSONParser.h @@ -18,8 +18,19 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_ALLOCATE_MEMORY; + extern const int JSON_PARSE_ERROR; } +#define SIMDJSON_ASSIGN_OR_THROW_IMPL(_result, _lhs, _rexpr) \ + auto && _result = (_rexpr); \ + if (_result.error() != ::simdjson::SUCCESS) \ + throw DB::ErrnoException(ErrorCodes::JSON_PARSE_ERROR, "simdjson error: {}", std::string(::simdjson::error_message(_result.error()))); \ + _lhs = std::move(_result).value_unsafe() + +#define SIMDJSON_ASSIGN_OR_THROW(_lhs, _rexpr) \ + SIMDJSON_ASSIGN_OR_THROW_IMPL( \ + DB_ANONYMOUS_VARIABLE(_simdjson_sesult), _lhs, _rexpr) + /// Format elements of basic types into string. /// The original implementation is mini_formatter in simdjson.h. But it is not public API, so we /// add a implementation here. @@ -264,13 +275,93 @@ public: format.key(kv.key); append(kv.value); } + + void append(simdjson::ondemand::value value) + { + switch (value.type()) + { + case simdjson::ondemand::json_type::array: + append(value.get_array()); + break; + case simdjson::ondemand::json_type::object: + append(value.get_object()); + break; + case simdjson::ondemand::json_type::number: + { + + simdjson::ondemand::number_type nt{}; + auto res = value.get_number_type().get(nt); + chassert(res == simdjson::SUCCESS); + switch (nt) + { + case simdjson::ondemand::number_type::signed_integer: + format.number(value.get_int64().value_unsafe()); + break; + case simdjson::ondemand::number_type::unsigned_integer: + format.number(value.get_uint64().value_unsafe()); + break; + case simdjson::ondemand::number_type::floating_point_number: + format.number(value.get_double().value_unsafe()); + break; + case simdjson::ondemand::number_type::big_integer: + format.string(value.get_string().value_unsafe()); + break; + } + break; + } + case simdjson::ondemand::json_type::string: + format.string(value.get_string().value_unsafe()); + break; + case simdjson::ondemand::json_type::boolean: + if (value.get_bool().value_unsafe()) + format.trueAtom(); + else + format.falseAtom(); + break; + case simdjson::ondemand::json_type::null: + format.nullAtom(); + break; + } + } + + void append(simdjson::ondemand::array array) + { + format.startArray(); + int i = 0; + for (simdjson::ondemand::value value : array) + { + if (i++ != 0) + format.comma(); + append(value); + } + format.endArray(); + } + + void append(simdjson::ondemand::object object) + { + format.startObject(); + int i = 0; + for (simdjson::ondemand::field field : object) + { + if (i++ != 0) + format.comma(); + append(field); + } + format.endObject(); + } + + void append(simdjson::ondemand::field field) + { + format.key(field.unescaped_key()); + append(field.value()); + } private: SimdJSONBasicFormatter format; }; /// This class can be used as an argument for the template class FunctionJSON. /// It provides ability to parse JSONs using simdjson library. -struct SimdJSONParser +struct DomSimdJSONParser { class Array; class Object; @@ -419,16 +510,250 @@ private: simdjson::dom::parser parser; }; -inline ALWAYS_INLINE SimdJSONParser::Array SimdJSONParser::Element::getArray() const +inline ALWAYS_INLINE DomSimdJSONParser::Array DomSimdJSONParser::Element::getArray() const { return element.get_array().value_unsafe(); } -inline ALWAYS_INLINE SimdJSONParser::Object SimdJSONParser::Element::getObject() const +inline ALWAYS_INLINE DomSimdJSONParser::Object DomSimdJSONParser::Element::getObject() const { return element.get_object().value_unsafe(); } +struct OnDemandSimdJSONParser +{ + class Array; + class Object; + + /// References an element in a JSON document, representing a JSON null, boolean, string, number, + /// array or object. + class Element + { + public: + ALWAYS_INLINE Element() {} /// NOLINT + ALWAYS_INLINE Element(simdjson::ondemand::value && value_) { value = std::move(value_); } + ALWAYS_INLINE Element & operator=(const simdjson::ondemand::value & value_) { value = value_; return *this; } + + ALWAYS_INLINE ElementType type() const + { + if (value.type() == simdjson::ondemand::json_type::object) + return ElementType::OBJECT; + if (value.type() == simdjson::ondemand::json_type::array) + return ElementType::ARRAY; + if (value.type() == simdjson::ondemand::json_type::boolean) + return ElementType::BOOL; + if (value.type() == simdjson::ondemand::json_type::string) + return ElementType::STRING; + if (value.type() == simdjson::ondemand::json_type::number) + { + auto res = value.get_number_type(); + if (res.error()) + return ElementType::NULL_VALUE; + if (res.value() == simdjson::ondemand::number_type::signed_integer) + return ElementType::INT64; + if (res.value() == simdjson::ondemand::number_type::unsigned_integer) + return ElementType::UINT64; + if (res.value() == simdjson::ondemand::number_type::floating_point_number) + return ElementType::DOUBLE; + } + return ElementType::NULL_VALUE; + } + + ALWAYS_INLINE bool isInt64() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::signed_integer; } + ALWAYS_INLINE bool isUInt64() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::unsigned_integer; } + ALWAYS_INLINE bool isDouble() const { auto res = value.get_number_type(); return !res.error() && res.value() == simdjson::ondemand::number_type::floating_point_number; } + ALWAYS_INLINE bool isString() const { auto r = value.type(); return !r.error() && r.value() == simdjson::ondemand::json_type::string; } + ALWAYS_INLINE bool isArray() const + { + auto r = value.type(); + return !r.error() && r.value() == simdjson::ondemand::json_type::array; + } + ALWAYS_INLINE bool isObject() const + { + auto r = value.type(); + return !r.error() && r.value() == simdjson::ondemand::json_type::object; + } + ALWAYS_INLINE bool isBool() const { return value.type() == simdjson::ondemand::json_type::boolean; } + ALWAYS_INLINE bool isNull() const { return value.type() == simdjson::ondemand::json_type::null; } + + ALWAYS_INLINE Int64 getInt64() const { return value.get_int64().value(); } + ALWAYS_INLINE UInt64 getUInt64() const { return value.get_uint64().value(); } + ALWAYS_INLINE double getDouble() const { return value.get_double().value(); } + ALWAYS_INLINE bool getBool() const { return value.get_bool().value(); } + ALWAYS_INLINE std::string_view getString() const + { + auto r = value.get_string(); + if (r.error()) + return {}; + return r.value(); + } + ALWAYS_INLINE Array getArray() const + { + SIMDJSON_ASSIGN_OR_THROW(auto arr, value.get_array()); + return arr; + } + ALWAYS_INLINE Object getObject() const + { + SIMDJSON_ASSIGN_OR_THROW(auto obj, value.get_object()); + return obj; + } + + ALWAYS_INLINE simdjson::ondemand::value getElement() const { return value; } + + private: + mutable simdjson::ondemand::value value; + }; + + /// References an array in a JSON document. + class Array + { + public: + class Iterator + { + public: + Iterator() = default; + ALWAYS_INLINE Iterator(const simdjson::ondemand::array_iterator & it_) : it(it_) {} /// NOLINT + ALWAYS_INLINE Element operator*() const { return (*it).value(); } + ALWAYS_INLINE Iterator & operator++() { ++it; return *this; } + ALWAYS_INLINE friend bool operator!=(const Iterator & left, const Iterator & right) { return left.it != right.it; } + ALWAYS_INLINE friend bool operator==(const Iterator & left, const Iterator & right) { return !(left != right); } + private: + mutable simdjson::ondemand::array_iterator it; + }; + + ALWAYS_INLINE Array(const simdjson::ondemand::array & array_) : array(array_) {} /// NOLINT + ALWAYS_INLINE Iterator begin() const { return array.begin().value(); } + ALWAYS_INLINE Iterator end() const { return array.end().value(); } + ALWAYS_INLINE size_t size() const { return array.count_elements().value(); } + ALWAYS_INLINE Element operator[](size_t index) const + { + if (index < last_index) + array.reset(); + if (last_index == 0) + { + SIMDJSON_ASSIGN_OR_THROW(auto iter, array.begin()); + it = iter; + } + size_t diff = index - last_index; + while (diff--) + ++it; + last_index = index; + SIMDJSON_ASSIGN_OR_THROW(auto ele, *it); + return ele; + } + + private: + mutable size_t last_index{}; + mutable simdjson::ondemand::array_iterator it; + mutable simdjson::ondemand::array array; + }; + + using KeyValuePair = std::pair; + + /// References an object in a JSON document. + class Object + { + public: + class Iterator + { + public: + ALWAYS_INLINE Iterator(const simdjson::ondemand::object_iterator & it_) : it(it_) {} /// NOLINT + + ALWAYS_INLINE KeyValuePair operator*() const + { + auto field_wrapper = *it; + if (field_wrapper.error()) + { + return {}; + } + std::string_view key; + auto key_error = field_wrapper.unescaped_key().get(key); + if (key_error) + { + return {}; + } + ::simdjson::ondemand::value v = field_wrapper.value(); + return {key, Element(std::move(v))}; + } + + ALWAYS_INLINE Iterator & operator++() { ++it; return *this; } + ALWAYS_INLINE friend bool operator!=(const Iterator & left, const Iterator & right) { return left.it != right.it; } + ALWAYS_INLINE friend bool operator==(const Iterator & left, const Iterator & right) { return !(left != right); } + private: + mutable simdjson::ondemand::object_iterator it; + }; + + ALWAYS_INLINE Object(const simdjson::ondemand::object & object_) : object(object_) {} /// NOLINT + ALWAYS_INLINE Iterator begin() const { return object.begin().value(); } + ALWAYS_INLINE Iterator end() const { return object.end().value(); } + ///NOTE: call size() before iterate + ALWAYS_INLINE size_t size() const + { + return object.count_fields().value(); + } + + bool find(std::string_view key, Element & result) const + { + auto x = object.find_field_unordered(key); + if (x.error()) + return false; + + result = x.value_unsafe(); + return true; + } + + bool reset() + { + auto v = object.reset(); + if (v.error()) + return false; + return true; + } + + /// Optional: Provides access to an object's element by index. + KeyValuePair operator[](size_t index) const + { + SIMDJSON_ASSIGN_OR_THROW(auto it, object.begin()); + while (index--) + { + (void)*(it); /// NEED TO DO THIS TO ITERATE + ++it; + } + SIMDJSON_ASSIGN_OR_THROW(auto field, *it); + std::string_view key = field.unescaped_key().value(); + simdjson::ondemand::value value = field.value(); + return {key, Element(std::move(value))}; + } + + private: + mutable simdjson::ondemand::object object; + }; + + /// Parses a JSON document, returns the reference to its root element if succeeded. + bool parse(std::string_view json, Element & result) + { + padstr = json; + auto res = parser.iterate(padstr); + if (res.error()) + return false; + + document = std::move(res.value()); + auto v = document.get_value(); + if (v.error()) + return false; + + result = v.value(); + return true; + } + +private: + simdjson::ondemand::parser parser; + simdjson::ondemand::document document{}; + simdjson::padded_string padstr; +}; + +using SimdJSONParser = OnDemandSimdJSONParser; + } #endif diff --git a/src/Formats/JSONExtractTree.cpp b/src/Formats/JSONExtractTree.cpp index 4ed65984c86..fe356be5712 100644 --- a/src/Formats/JSONExtractTree.cpp +++ b/src/Formats/JSONExtractTree.cpp @@ -1211,9 +1211,9 @@ public: auto array = element.getArray(); auto it = array.begin(); - for (size_t index = 0; (index != nested.size()) && (it != array.end()); ++index) + for (size_t index = 0; (index != nested.size()) && (it != array.end()); ++index, ++it) { - if (nested[index]->insertResultToColumn(tuple.getColumn(index), *it++, insert_settings, format_settings, error)) + if (nested[index]->insertResultToColumn(tuple.getColumn(index), *it, insert_settings, format_settings, error)) { were_valid_elements = true; } @@ -1239,9 +1239,9 @@ public: if (name_to_index_map.empty()) { auto it = object.begin(); - for (size_t index = 0; (index != nested.size()) && (it != object.end()); ++index) + for (size_t index = 0; (index != nested.size()) && (it != object.end()); ++index, ++it) { - if (nested[index]->insertResultToColumn(tuple.getColumn(index), (*it++).second, insert_settings, format_settings, error)) + if (nested[index]->insertResultToColumn(tuple.getColumn(index), (*it).second, insert_settings, format_settings, error)) { were_valid_elements = true; } @@ -1316,18 +1316,18 @@ public: error = fmt::format("cannot read Map value from JSON element: {}", jsonElementToString(element, format_settings)); return false; } - auto & map_col = assert_cast(column); auto & offsets = map_col.getNestedColumn().getOffsets(); auto & tuple_col = map_col.getNestedData(); auto & key_col = tuple_col.getColumn(0); auto & value_col = tuple_col.getColumn(1); size_t old_size = tuple_col.size(); - auto object = element.getObject(); auto it = object.begin(); + size_t object_size = 0; for (; it != object.end(); ++it) { + ++object_size; auto pair = *it; /// Insert key @@ -1350,7 +1350,7 @@ public: } } - offsets.push_back(old_size + object.size()); + offsets.push_back(old_size + object_size); return true; } diff --git a/src/Functions/FunctionSQLJSON.h b/src/Functions/FunctionSQLJSON.h index d56e655e126..58e18df23b0 100644 --- a/src/Functions/FunctionSQLJSON.h +++ b/src/Functions/FunctionSQLJSON.h @@ -102,8 +102,12 @@ public: } /// serialize the json element into column's buffer directly - void addElement(const Element & element) + void addElement(const Element & element, const std::string sep = ", ") { + if (is_first) + is_first = false; + else + addRawData(sep.data(), sep.size()); formatter.append(element.getElement()); } void commit() @@ -121,7 +125,7 @@ private: IColumn::Offsets & offsets; Formatter formatter; size_t prev_offset; - + bool is_first{true}; }; @@ -364,7 +368,6 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - current_element = root; } if (status == VisitorStatus::Exhausted) @@ -416,19 +419,14 @@ public: bool success = false; const char * array_begin = "["; const char * array_end = "]"; - const char * comma = ", "; JSONStringSerializer json_serializer(col_str); json_serializer.addRawData(array_begin, 1); - while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted) + std::function result_func= [&json_serializer](const Element & element) { json_serializer.addElement(element); }; + while ((status = generator_json_path.getNextItemBatch(current_element, result_func)) != VisitorStatus::Exhausted) { if (status == VisitorStatus::Ok) { - if (success) - { - json_serializer.addRawData(comma, 2); - } success = true; - json_serializer.addElement(current_element); } else if (status == VisitorStatus::Error) { @@ -436,7 +434,6 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - current_element = root; } if (!success) { diff --git a/src/Functions/FunctionsJSON.cpp b/src/Functions/FunctionsJSON.cpp index 52aaadf33af..e1815515ff9 100644 --- a/src/Functions/FunctionsJSON.cpp +++ b/src/Functions/FunctionsJSON.cpp @@ -273,12 +273,13 @@ private: if (element.isArray()) { auto array = element.getArray(); + size_t array_size = array.size(); if (index >= 0) --index; else - index += array.size(); + index += array_size; - if (static_cast(index) >= array.size()) + if (static_cast(index) >= array_size) return false; element = array[index]; out_key = {}; @@ -290,12 +291,13 @@ private: if (element.isObject()) { auto object = element.getObject(); + size_t object_size = object.size(); if (index >= 0) --index; else - index += object.size(); + index += object_size; - if (static_cast(index) >= object.size()) + if (static_cast(index) >= object_size) return false; std::tie(out_key, element) = object[index]; return true; @@ -985,10 +987,13 @@ public: auto array = element.getArray(); ColumnArray & col_res = assert_cast(dest); + size_t size = 0; for (auto value : array) + { + ++size; JSONExtractRawImpl::insertResultToColumn(col_res.getData(), value, {}, format_settings, error); - - col_res.getOffsets().push_back(col_res.getOffsets().back() + array.size()); + } + col_res.getOffsets().push_back(col_res.getOffsets().back() + size); return true; } }; @@ -1021,13 +1026,15 @@ public: auto & col_key = assert_cast(col_tuple.getColumn(0)); auto & col_value = assert_cast(col_tuple.getColumn(1)); + size_t size = 0; for (const auto & [key, value] : object) { col_key.insertData(key.data(), key.size()); JSONExtractRawImpl::insertResultToColumn(col_value, value, {}, format_settings, error); + ++size; } - col_arr.getOffsets().push_back(col_arr.getOffsets().back() + object.size()); + col_arr.getOffsets().push_back(col_arr.getOffsets().back() + size); return true; } }; @@ -1055,12 +1062,14 @@ public: ColumnArray & col_res = assert_cast(dest); auto & col_key = assert_cast(col_res.getData()); + size_t count = 0; for (const auto & [key, value] : object) { + ++count; col_key.insertData(key.data(), key.size()); } - col_res.getOffsets().push_back(col_res.getOffsets().back() + object.size()); + col_res.getOffsets().push_back(col_res.getOffsets().back() + count); return true; } }; diff --git a/src/Functions/JSONPath/Generator/GeneratorJSONPath.h b/src/Functions/JSONPath/Generator/GeneratorJSONPath.h index 1016d776be0..02ef44a3adc 100644 --- a/src/Functions/JSONPath/Generator/GeneratorJSONPath.h +++ b/src/Functions/JSONPath/Generator/GeneratorJSONPath.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -21,6 +22,7 @@ template class GeneratorJSONPath : public IGenerator { public: + using TElement = typename JSONParser::Element; /** * Traverses children ASTs of ASTJSONPathQuery and creates a vector of corresponding visitors * @param query_ptr_ pointer to ASTJSONPathQuery @@ -80,11 +82,6 @@ public: return VisitorStatus::Exhausted; } - for (int i = 0; i < current_visitor; ++i) - { - visitors[i]->apply(current); - } - VisitorStatus status = VisitorStatus::Error; for (size_t i = current_visitor; i < visitors.size(); ++i) { @@ -105,6 +102,31 @@ public: } } + VisitorStatus getNextItemBatch(TElement & element, std::function & res_func) override + { + while (true) + { + /// element passed to us actually is root, so here we assign current to root + auto current = element; + if (current_visitor < 0) + return VisitorStatus::Exhausted; + + VisitorStatus status = VisitorStatus::Error; + size_t visitor_size = visitors.size(); + for (size_t i = current_visitor; i < visitor_size; ++i) + { + status = visitors[i]->visitBatch(current, res_func, i == visitor_size - 1); + current_visitor = static_cast(i); + if (status == VisitorStatus::Error || status == VisitorStatus::Ignore) + break; + } + updateVisitorsForNextRun(); + + if (status != VisitorStatus::Ignore) + return status; + } + } + void reinitialize() { while (current_visitor >= 0) diff --git a/src/Functions/JSONPath/Generator/IGenerator.h b/src/Functions/JSONPath/Generator/IGenerator.h index d3a6c32de72..ffb44d8ad42 100644 --- a/src/Functions/JSONPath/Generator/IGenerator.h +++ b/src/Functions/JSONPath/Generator/IGenerator.h @@ -10,6 +10,8 @@ template class IGenerator { public: + using TElement = typename JSONParser::Element; + IGenerator() = default; virtual const char * getName() const = 0; @@ -20,7 +22,9 @@ public: * @param element to be extracted into * @return true if generator is not exhausted */ - virtual VisitorStatus getNextItem(typename JSONParser::Element & element) = 0; + virtual VisitorStatus getNextItem(TElement & element) = 0; + + virtual VisitorStatus getNextItemBatch(TElement & element, std::function & res_func) = 0; virtual ~IGenerator() = default; }; diff --git a/src/Functions/JSONPath/Generator/IVisitor.h b/src/Functions/JSONPath/Generator/IVisitor.h index 1a94106a435..72c3196efd5 100644 --- a/src/Functions/JSONPath/Generator/IVisitor.h +++ b/src/Functions/JSONPath/Generator/IVisitor.h @@ -8,13 +8,20 @@ template class IVisitor { public: + using TElement = typename JSONParser::Element; virtual const char * getName() const = 0; /** * Applies this visitor to document and mutates its state * @param element simdjson element */ - virtual VisitorStatus visit(typename JSONParser::Element & element) = 0; + virtual VisitorStatus visit(TElement & element) = 0; + + /** + * Applies this visitor to document and mutates its state, returning a batch of results + * @param element simdjson element + */ + virtual VisitorStatus visitBatch(TElement &element, std::function & res_func, bool can_reduce) = 0; /** * Applies this visitor to document, but does not mutate state diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h index 8446e1ff3be..ec36a67aba1 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -10,6 +11,7 @@ template class VisitorJSONPathMemberAccess : public IVisitor { public: + using TElement = JSONParser::Element; explicit VisitorJSONPathMemberAccess(ASTPtr member_access_ptr_) : member_access_ptr(member_access_ptr_->as()) { } @@ -18,25 +20,42 @@ public: VisitorStatus apply(typename JSONParser::Element & element) const override { typename JSONParser::Element result; - element.getObject().find(std::string_view(member_access_ptr->member_name), result); - element = result; + auto obj = element.getObject(); + if (!obj.find(std::string_view(member_access_ptr->member_name), result)) + { + return VisitorStatus::Error; + } + element = std::move(result); return VisitorStatus::Ok; } VisitorStatus visit(typename JSONParser::Element & element) override { + if (this->isExhausted()) + { + return VisitorStatus::Exhausted; + } this->setExhausted(true); if (!element.isObject()) { return VisitorStatus::Error; } - typename JSONParser::Element result; - if (!element.getObject().find(std::string_view(member_access_ptr->member_name), result)) - { + return apply(element); + } + + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (this->isExhausted()) + return VisitorStatus::Exhausted; + this->setExhausted(true); + if (!element.isObject()) return VisitorStatus::Error; - } - apply(element); - return VisitorStatus::Ok; + + auto status = apply(element); + if (status == VisitorStatus::Ok && can_reduce) + res_func(element); + + return status; } void reinitialize() override { this->setExhausted(false); } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h index 708a71f7cf4..09763fe0b5b 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -20,21 +21,78 @@ public: VisitorStatus apply(typename JSONParser::Element & element) const override { - typename JSONParser::Array array = element.getArray(); - element = array[current_index]; + element = (*array)[current_index]; return VisitorStatus::Ok; } + using TElement = JSONParser::Element; + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (!array && !element.isArray()) + { + this->setExhausted(true); + return VisitorStatus::Error; + } + + VisitorStatus status = VisitorStatus::Ok; + if (!array) + { + current_element = element; + array = current_element.getArray(); + if (!can_reduce) + array_size = array.value().size(); + } + + if (can_reduce) + { + std::set index_set{}; + for (auto range: range_ptr->ranges) + for (size_t i = range.first ; i < range.second; ++i) + index_set.insert(i); + + size_t idx = 0; + for (auto item: array.value()) + if (index_set.find(idx++) != index_set.end()) + res_func(item); + + this->setExhausted(true); + status = VisitorStatus::Ok; + } + else + { + if (current_index < array_size.value()) + { + apply(element); + status = VisitorStatus::Ok; + } + else + status = VisitorStatus::Ignore; + + if (current_index + 1 == range_ptr->ranges[current_range].second + && current_range + 1 == range_ptr->ranges.size()) + { + this->setExhausted(true); + } + } + + return status; + } VisitorStatus visit(typename JSONParser::Element & element) override { - if (!element.isArray()) + if (!array && !element.isArray()) { this->setExhausted(true); return VisitorStatus::Error; } VisitorStatus status; - if (current_index < element.getArray().size()) + if (!array) + { + current_element = element; + array = current_element.getArray(); + array_size = array.value().size(); + } + if (current_index < array_size.value()) { apply(element); status = VisitorStatus::Ok; @@ -58,6 +116,8 @@ public: current_range = 0; current_index = range_ptr->ranges[current_range].first; this->setExhausted(false); + array_size.reset(); + array.reset(); } void updateState() override @@ -74,6 +134,9 @@ private: ASTJSONPathRange * range_ptr; size_t current_range; UInt32 current_index; + std::optional array_size{}; + std::optional array{}; + typename JSONParser::Element current_element{}; }; } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h index 71569d3c0a0..cc9001110db 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h @@ -10,6 +10,7 @@ template class VisitorJSONPathRoot : public IVisitor { public: + using TElement = JSONParser::Element; explicit VisitorJSONPathRoot(ASTPtr) { } const char * getName() const override { return "VisitorJSONPathRoot"; } @@ -27,9 +28,19 @@ public: return VisitorStatus::Ok; } + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + apply(element); + this->setExhausted(true); + if (can_reduce) + res_func(element); + return VisitorStatus::Ok; + } + void reinitialize() override { this->setExhausted(false); } void updateState() override { } + }; } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h index 0c297f64316..51b499bd892 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -19,21 +20,25 @@ public: VisitorStatus apply(typename JSONParser::Element & element) const override { - typename JSONParser::Array array = element.getArray(); - element = array[current_index]; + element = array.value()[current_index]; return VisitorStatus::Ok; } VisitorStatus visit(typename JSONParser::Element & element) override { - if (!element.isArray()) + if (!array && !element.isArray()) { this->setExhausted(true); return VisitorStatus::Error; } + if (!array_size) + { + array = element.getArray(); + array_size = array.value().size(); + } VisitorStatus status; - if (current_index < element.getArray().size()) + if (current_index < array_size.value()) { apply(element); status = VisitorStatus::Ok; @@ -46,11 +51,53 @@ public: return status; } + using TElement = JSONParser::Element; + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (!array && !element.isArray()) + { + this->setExhausted(true); + return VisitorStatus::Error; + } + + if (!array) + { + array = element.getArray(); + array_size = array.value().size(); + } + VisitorStatus status = VisitorStatus::Ok; + + if (can_reduce) + { + for (auto item: array.value()) + res_func(item); + + this->setExhausted(true); + } + else + { + if (current_index < array_size.value()) + { + apply(element); + status = VisitorStatus::Ok; + } + else + { + status = VisitorStatus::Ignore; + this->setExhausted(true); + } + + } + + return status; + } void reinitialize() override { current_index = 0; this->setExhausted(false); + array_size.reset(); + array.reset(); } void updateState() override @@ -59,7 +106,9 @@ public: } private: - UInt32 current_index; + UInt32 current_index{}; + std::optional array{}; + std::optional array_size{}; }; }