From 7f6dcb854b788a20a9f6900f245033d7ff345923 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Wed, 20 Nov 2024 15:06:09 +0800 Subject: [PATCH] add batch visitor --- base/base/defines.h | 2 +- src/Common/JSONParsers/DummyJSONParser.h | 2 +- src/Common/JSONParsers/SimdJSONParser.h | 49 +++++++++++-- src/Functions/FunctionSQLJSON.h | 24 +++--- .../JSONPath/Generator/GeneratorJSONPath.h | 32 ++++++-- src/Functions/JSONPath/Generator/IGenerator.h | 6 +- src/Functions/JSONPath/Generator/IVisitor.h | 6 +- .../Generator/VisitorJSONPathMemberAccess.h | 37 +++++++--- .../JSONPath/Generator/VisitorJSONPathRange.h | 73 +++++++++++++++++-- .../JSONPath/Generator/VisitorJSONPathRoot.h | 13 +++- .../JSONPath/Generator/VisitorJSONPathStar.h | 61 ++++++++++++++-- 11 files changed, 258 insertions(+), 47 deletions(-) diff --git a/base/base/defines.h b/base/base/defines.h index 78f5e94ee7a..6ef7528688c 100644 --- a/base/base/defines.h +++ b/base/base/defines.h @@ -169,5 +169,5 @@ constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguideli #define DB_CONCATENATE(s1, s2) DB_CONCATENATE_IMPL(s1, s2) #define DB_ANONYMOUS_VARIABLE(str) \ - DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__) + DB_CONCATENATE(DB_CONCATENATE(DB_CONCATENATE(str, __COUNTER__), _), __LINE__) diff --git a/src/Common/JSONParsers/DummyJSONParser.h b/src/Common/JSONParsers/DummyJSONParser.h index 394a2817ea1..12432e2ecad 100644 --- a/src/Common/JSONParsers/DummyJSONParser.h +++ b/src/Common/JSONParsers/DummyJSONParser.h @@ -87,7 +87,7 @@ struct DummyJSONParser static Iterator end() { return {}; } static size_t size() { return 0; } bool find(std::string_view, Element &) const { return false; } /// NOLINT - + bool reset() { return true; } #if 0 /// Optional: Provides access to an object's element by index. KeyValuePair operator[](size_t) const { return {}; } diff --git a/src/Common/JSONParsers/SimdJSONParser.h b/src/Common/JSONParsers/SimdJSONParser.h index 412cf2fa0d6..70306cbe57e 100644 --- a/src/Common/JSONParsers/SimdJSONParser.h +++ b/src/Common/JSONParsers/SimdJSONParser.h @@ -1,5 +1,6 @@ #pragma once +#include "Common/StackTrace.h" #include "config.h" #if USE_SIMDJSON @@ -590,11 +591,13 @@ struct OnDemandSimdJSONParser } ALWAYS_INLINE Array getArray() const { - return value.get_array().value(); + SIMDJSON_ASSIGN_OR_THROW(auto arr, value.get_array()); + return arr; } ALWAYS_INLINE Object getObject() const { - return value.get_object().value(); + SIMDJSON_ASSIGN_OR_THROW(auto obj, value.get_object()); + return obj; } ALWAYS_INLINE simdjson::ondemand::value getElement() const { return value; } @@ -610,6 +613,7 @@ struct OnDemandSimdJSONParser class Iterator { public: + Iterator() = default; ALWAYS_INLINE Iterator(const simdjson::ondemand::array_iterator & it_) : it(it_) {} /// NOLINT ALWAYS_INLINE Element operator*() const { return (*it).value(); } ALWAYS_INLINE Iterator & operator++() { ++it; return *this; } @@ -623,9 +627,26 @@ struct OnDemandSimdJSONParser ALWAYS_INLINE Iterator begin() const { return array.begin().value(); } ALWAYS_INLINE Iterator end() const { return array.end().value(); } ALWAYS_INLINE size_t size() const { return array.count_elements().value(); } - ALWAYS_INLINE Element operator[](size_t index) const { return array.at(index).value(); } + ALWAYS_INLINE Element operator[](size_t index) const + { + if (index < last_index) + array.reset(); + if (last_index == 0) + { + SIMDJSON_ASSIGN_OR_THROW(auto iter, array.begin()); + it = iter; + } + size_t diff = index - last_index; + while (diff--) + ++it; + last_index = index; + SIMDJSON_ASSIGN_OR_THROW(auto ele, *it); + return ele; + } private: + mutable size_t last_index{}; + mutable simdjson::ondemand::array_iterator it; mutable simdjson::ondemand::array array; }; @@ -642,8 +663,18 @@ struct OnDemandSimdJSONParser ALWAYS_INLINE KeyValuePair operator*() const { - SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it); - SIMDJSON_ASSIGN_OR_THROW(std::string_view key, field_wrapper.unescaped_key()); + //SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it); + auto field_wrapper = *it; + if (field_wrapper.error()) + { + return {}; + } + std::string_view key; + auto key_error = field_wrapper.unescaped_key().get(key); + if (key_error) + { + return {}; + } ::simdjson::ondemand::value v = field_wrapper.value(); return {key, Element(std::move(v))}; } @@ -674,6 +705,14 @@ struct OnDemandSimdJSONParser return true; } + bool reset() + { + auto v = object.reset(); + if (v.error()) + return false; + return true; + } + /// Optional: Provides access to an object's element by index. KeyValuePair operator[](size_t index) const { diff --git a/src/Functions/FunctionSQLJSON.h b/src/Functions/FunctionSQLJSON.h index d56e655e126..300c6152b4d 100644 --- a/src/Functions/FunctionSQLJSON.h +++ b/src/Functions/FunctionSQLJSON.h @@ -102,8 +102,12 @@ public: } /// serialize the json element into column's buffer directly - void addElement(const Element & element) + void addElement(const Element & element, const std::string sep = ", ") { + if (is_first) + is_first = false; + else + addRawData(sep.data(), sep.size()); formatter.append(element.getElement()); } void commit() @@ -121,7 +125,7 @@ private: IColumn::Offsets & offsets; Formatter formatter; size_t prev_offset; - + bool is_first{true}; }; @@ -364,7 +368,7 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - current_element = root; + //current_element = root; } if (status == VisitorStatus::Exhausted) @@ -416,19 +420,16 @@ public: bool success = false; const char * array_begin = "["; const char * array_end = "]"; - const char * comma = ", "; + //const char * comma = ", "; JSONStringSerializer json_serializer(col_str); json_serializer.addRawData(array_begin, 1); - while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted) + //std::vector result{}; + std::function result_func= [&json_serializer](const Element & element) { json_serializer.addElement(element); }; + while ((status = generator_json_path.getNextItemBatch(current_element, result_func)) != VisitorStatus::Exhausted) { if (status == VisitorStatus::Ok) { - if (success) - { - json_serializer.addRawData(comma, 2); - } success = true; - json_serializer.addElement(current_element); } else if (status == VisitorStatus::Error) { @@ -436,7 +437,8 @@ public: /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } - current_element = root; + //current_element = root; + //result.clear(); } if (!success) { diff --git a/src/Functions/JSONPath/Generator/GeneratorJSONPath.h b/src/Functions/JSONPath/Generator/GeneratorJSONPath.h index 1016d776be0..02ef44a3adc 100644 --- a/src/Functions/JSONPath/Generator/GeneratorJSONPath.h +++ b/src/Functions/JSONPath/Generator/GeneratorJSONPath.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -21,6 +22,7 @@ template class GeneratorJSONPath : public IGenerator { public: + using TElement = typename JSONParser::Element; /** * Traverses children ASTs of ASTJSONPathQuery and creates a vector of corresponding visitors * @param query_ptr_ pointer to ASTJSONPathQuery @@ -80,11 +82,6 @@ public: return VisitorStatus::Exhausted; } - for (int i = 0; i < current_visitor; ++i) - { - visitors[i]->apply(current); - } - VisitorStatus status = VisitorStatus::Error; for (size_t i = current_visitor; i < visitors.size(); ++i) { @@ -105,6 +102,31 @@ public: } } + VisitorStatus getNextItemBatch(TElement & element, std::function & res_func) override + { + while (true) + { + /// element passed to us actually is root, so here we assign current to root + auto current = element; + if (current_visitor < 0) + return VisitorStatus::Exhausted; + + VisitorStatus status = VisitorStatus::Error; + size_t visitor_size = visitors.size(); + for (size_t i = current_visitor; i < visitor_size; ++i) + { + status = visitors[i]->visitBatch(current, res_func, i == visitor_size - 1); + current_visitor = static_cast(i); + if (status == VisitorStatus::Error || status == VisitorStatus::Ignore) + break; + } + updateVisitorsForNextRun(); + + if (status != VisitorStatus::Ignore) + return status; + } + } + void reinitialize() { while (current_visitor >= 0) diff --git a/src/Functions/JSONPath/Generator/IGenerator.h b/src/Functions/JSONPath/Generator/IGenerator.h index d3a6c32de72..a91c940316d 100644 --- a/src/Functions/JSONPath/Generator/IGenerator.h +++ b/src/Functions/JSONPath/Generator/IGenerator.h @@ -10,6 +10,8 @@ template class IGenerator { public: + using TElement = typename JSONParser::Element; + IGenerator() = default; virtual const char * getName() const = 0; @@ -20,7 +22,9 @@ public: * @param element to be extracted into * @return true if generator is not exhausted */ - virtual VisitorStatus getNextItem(typename JSONParser::Element & element) = 0; + virtual VisitorStatus getNextItem(TElement & element) = 0; + + virtual VisitorStatus getNextItemBatch(TElement & element, std::function & ) = 0; virtual ~IGenerator() = default; }; diff --git a/src/Functions/JSONPath/Generator/IVisitor.h b/src/Functions/JSONPath/Generator/IVisitor.h index 1a94106a435..1a39a09bad6 100644 --- a/src/Functions/JSONPath/Generator/IVisitor.h +++ b/src/Functions/JSONPath/Generator/IVisitor.h @@ -8,6 +8,7 @@ template class IVisitor { public: + using TElement = typename JSONParser::Element; virtual const char * getName() const = 0; /** @@ -15,12 +16,13 @@ public: * @param element simdjson element */ virtual VisitorStatus visit(typename JSONParser::Element & element) = 0; + virtual VisitorStatus visitBatch(TElement & element, std::function & , bool ) = 0; - /** + /** * Applies this visitor to document, but does not mutate state * @param element simdjson element */ - virtual VisitorStatus apply(typename JSONParser::Element & element) const = 0; + virtual VisitorStatus apply(typename JSONParser::Element & element) = 0; /** * Restores visitor's initial state for later use diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h index 8446e1ff3be..efbd3d9fba6 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -10,33 +11,51 @@ template class VisitorJSONPathMemberAccess : public IVisitor { public: + using TElement = JSONParser::Element; explicit VisitorJSONPathMemberAccess(ASTPtr member_access_ptr_) : member_access_ptr(member_access_ptr_->as()) { } const char * getName() const override { return "VisitorJSONPathMemberAccess"; } - VisitorStatus apply(typename JSONParser::Element & element) const override + VisitorStatus apply(typename JSONParser::Element & element) override { typename JSONParser::Element result; - element.getObject().find(std::string_view(member_access_ptr->member_name), result); - element = result; + auto obj = element.getObject(); + if (!obj.find(std::string_view(member_access_ptr->member_name), result)) + { + return VisitorStatus::Error; + } + element = std::move(result); return VisitorStatus::Ok; } VisitorStatus visit(typename JSONParser::Element & element) override { + if (this->isExhausted()) + { + return VisitorStatus::Exhausted; + } this->setExhausted(true); if (!element.isObject()) { return VisitorStatus::Error; } - typename JSONParser::Element result; - if (!element.getObject().find(std::string_view(member_access_ptr->member_name), result)) - { + return apply(element); + } + + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (this->isExhausted()) + return VisitorStatus::Exhausted; + this->setExhausted(true); + if (!element.isObject()) return VisitorStatus::Error; - } - apply(element); - return VisitorStatus::Ok; + + auto status = apply(element); + if (status == VisitorStatus::Ok && can_reduce) + res_func(element); + + return status; } void reinitialize() override { this->setExhausted(false); } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h index 708a71f7cf4..7cb429d59bb 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRange.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -18,23 +19,80 @@ public: const char * getName() const override { return "VisitorJSONPathRange"; } - VisitorStatus apply(typename JSONParser::Element & element) const override + VisitorStatus apply(typename JSONParser::Element & element) override { - typename JSONParser::Array array = element.getArray(); - element = array[current_index]; + element = (*array)[current_index]; return VisitorStatus::Ok; } + using TElement = JSONParser::Element; + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (!array && !element.isArray()) + { + this->setExhausted(true); + return VisitorStatus::Error; + } + + VisitorStatus status = VisitorStatus::Ok; + if (!array) + { + current_element = element; + array = current_element.getArray(); + if (!can_reduce) + array_size = array.value().size(); + } + + if (can_reduce) + { + std::set index_set{}; + for (auto range: range_ptr->ranges) + for (size_t i = range.first ; i < range.second; ++i) + index_set.insert(i); + + size_t idx = 0; + for (auto item: array.value()) + if (index_set.find(idx++) != index_set.end()) + res_func(item); + + this->setExhausted(true); + status = VisitorStatus::Ok; + } + else + { + if (current_index < array_size.value()) + { + apply(element); + status = VisitorStatus::Ok; + } + else + status = VisitorStatus::Ignore; + + if (current_index + 1 == range_ptr->ranges[current_range].second + && current_range + 1 == range_ptr->ranges.size()) + { + this->setExhausted(true); + } + } + + return status; + } VisitorStatus visit(typename JSONParser::Element & element) override { - if (!element.isArray()) + if (!array && !element.isArray()) { this->setExhausted(true); return VisitorStatus::Error; } VisitorStatus status; - if (current_index < element.getArray().size()) + if (!array) + { + current_element = element; + array = current_element.getArray(); + array_size = array.value().size(); + } + if (current_index < array_size.value()) { apply(element); status = VisitorStatus::Ok; @@ -58,6 +116,8 @@ public: current_range = 0; current_index = range_ptr->ranges[current_range].first; this->setExhausted(false); + array_size.reset(); + array.reset(); } void updateState() override @@ -74,6 +134,9 @@ private: ASTJSONPathRange * range_ptr; size_t current_range; UInt32 current_index; + std::optional array_size{}; + std::optional array{}; + typename JSONParser::Element current_element{}; }; } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h index 71569d3c0a0..6f980073030 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathRoot.h @@ -10,11 +10,12 @@ template class VisitorJSONPathRoot : public IVisitor { public: + using TElement = JSONParser::Element; explicit VisitorJSONPathRoot(ASTPtr) { } const char * getName() const override { return "VisitorJSONPathRoot"; } - VisitorStatus apply(typename JSONParser::Element & /*element*/) const override + VisitorStatus apply(typename JSONParser::Element & /*element*/) override { /// No-op on document, since we are already passed document's root return VisitorStatus::Ok; @@ -27,9 +28,19 @@ public: return VisitorStatus::Ok; } + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + apply(element); + this->setExhausted(true); + if (can_reduce) + res_func(element); + return VisitorStatus::Ok; + } + void reinitialize() override { this->setExhausted(false); } void updateState() override { } + }; } diff --git a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h index 0c297f64316..d36a323abe7 100644 --- a/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h +++ b/src/Functions/JSONPath/Generator/VisitorJSONPathStar.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -17,23 +18,27 @@ public: const char * getName() const override { return "VisitorJSONPathStar"; } - VisitorStatus apply(typename JSONParser::Element & element) const override + VisitorStatus apply(typename JSONParser::Element & element) override { - typename JSONParser::Array array = element.getArray(); - element = array[current_index]; + element = array.value()[current_index]; return VisitorStatus::Ok; } VisitorStatus visit(typename JSONParser::Element & element) override { - if (!element.isArray()) + if (!array && !element.isArray()) { this->setExhausted(true); return VisitorStatus::Error; } + if (!array_size) + { + array = element.getArray(); + array_size = array.value().size(); + } VisitorStatus status; - if (current_index < element.getArray().size()) + if (current_index < array_size.value()) { apply(element); status = VisitorStatus::Ok; @@ -46,11 +51,53 @@ public: return status; } + using TElement = JSONParser::Element; + VisitorStatus visitBatch(TElement & element, std::function & res_func, bool can_reduce) override + { + if (!array && !element.isArray()) + { + this->setExhausted(true); + return VisitorStatus::Error; + } + + if (!array) + { + array = element.getArray(); + array_size = array.value().size(); + } + VisitorStatus status = VisitorStatus::Ok; + + if (can_reduce) + { + for (auto item: array.value()) + res_func(item); + + this->setExhausted(true); + } + else + { + if (current_index < array_size.value()) + { + apply(element); + status = VisitorStatus::Ok; + } + else + { + status = VisitorStatus::Ignore; + this->setExhausted(true); + } + + } + + return status; + } void reinitialize() override { current_index = 0; this->setExhausted(false); + array_size.reset(); + array.reset(); } void updateState() override @@ -59,7 +106,9 @@ public: } private: - UInt32 current_index; + UInt32 current_index{}; + std::optional array{}; + std::optional array_size{}; }; }