add batch visitor

This commit is contained in:
zhanglistar 2024-11-20 15:06:09 +08:00
parent 93464f52f4
commit 7f6dcb854b
11 changed files with 258 additions and 47 deletions

View File

@ -87,7 +87,7 @@ struct DummyJSONParser
static Iterator end() { return {}; }
static size_t size() { return 0; }
bool find(std::string_view, Element &) const { return false; } /// NOLINT
bool reset() { return true; }
#if 0
/// Optional: Provides access to an object's element by index.
KeyValuePair operator[](size_t) const { return {}; }

View File

@ -1,5 +1,6 @@
#pragma once
#include "Common/StackTrace.h"
#include "config.h"
#if USE_SIMDJSON
@ -590,11 +591,13 @@ struct OnDemandSimdJSONParser
}
ALWAYS_INLINE Array getArray() const
{
return value.get_array().value();
SIMDJSON_ASSIGN_OR_THROW(auto arr, value.get_array());
return arr;
}
ALWAYS_INLINE Object getObject() const
{
return value.get_object().value();
SIMDJSON_ASSIGN_OR_THROW(auto obj, value.get_object());
return obj;
}
ALWAYS_INLINE simdjson::ondemand::value getElement() const { return value; }
@ -610,6 +613,7 @@ struct OnDemandSimdJSONParser
class Iterator
{
public:
Iterator() = default;
ALWAYS_INLINE Iterator(const simdjson::ondemand::array_iterator & it_) : it(it_) {} /// NOLINT
ALWAYS_INLINE Element operator*() const { return (*it).value(); }
ALWAYS_INLINE Iterator & operator++() { ++it; return *this; }
@ -623,9 +627,26 @@ struct OnDemandSimdJSONParser
ALWAYS_INLINE Iterator begin() const { return array.begin().value(); }
ALWAYS_INLINE Iterator end() const { return array.end().value(); }
ALWAYS_INLINE size_t size() const { return array.count_elements().value(); }
ALWAYS_INLINE Element operator[](size_t index) const { return array.at(index).value(); }
ALWAYS_INLINE Element operator[](size_t index) const
{
if (index < last_index)
array.reset();
if (last_index == 0)
{
SIMDJSON_ASSIGN_OR_THROW(auto iter, array.begin());
it = iter;
}
size_t diff = index - last_index;
while (diff--)
++it;
last_index = index;
SIMDJSON_ASSIGN_OR_THROW(auto ele, *it);
return ele;
}
private:
mutable size_t last_index{};
mutable simdjson::ondemand::array_iterator it;
mutable simdjson::ondemand::array array;
};
@ -642,8 +663,18 @@ struct OnDemandSimdJSONParser
ALWAYS_INLINE KeyValuePair operator*() const
{
SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it);
SIMDJSON_ASSIGN_OR_THROW(std::string_view key, field_wrapper.unescaped_key());
//SIMDJSON_ASSIGN_OR_THROW(auto field_wrapper, *it);
auto field_wrapper = *it;
if (field_wrapper.error())
{
return {};
}
std::string_view key;
auto key_error = field_wrapper.unescaped_key().get(key);
if (key_error)
{
return {};
}
::simdjson::ondemand::value v = field_wrapper.value();
return {key, Element(std::move(v))};
}
@ -674,6 +705,14 @@ struct OnDemandSimdJSONParser
return true;
}
bool reset()
{
auto v = object.reset();
if (v.error())
return false;
return true;
}
/// Optional: Provides access to an object's element by index.
KeyValuePair operator[](size_t index) const
{

View File

@ -102,8 +102,12 @@ public:
}
/// serialize the json element into column's buffer directly
void addElement(const Element & element)
void addElement(const Element & element, const std::string sep = ", ")
{
if (is_first)
is_first = false;
else
addRawData(sep.data(), sep.size());
formatter.append(element.getElement());
}
void commit()
@ -121,7 +125,7 @@ private:
IColumn::Offsets & offsets;
Formatter formatter;
size_t prev_offset;
bool is_first{true};
};
@ -364,7 +368,7 @@ public:
/// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6),
/// however this functionality is not implemented yet
}
current_element = root;
//current_element = root;
}
if (status == VisitorStatus::Exhausted)
@ -416,19 +420,16 @@ public:
bool success = false;
const char * array_begin = "[";
const char * array_end = "]";
const char * comma = ", ";
//const char * comma = ", ";
JSONStringSerializer json_serializer(col_str);
json_serializer.addRawData(array_begin, 1);
while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted)
//std::vector<Element> result{};
std::function<void(const Element&)> result_func= [&json_serializer](const Element & element) { json_serializer.addElement(element); };
while ((status = generator_json_path.getNextItemBatch(current_element, result_func)) != VisitorStatus::Exhausted)
{
if (status == VisitorStatus::Ok)
{
if (success)
{
json_serializer.addRawData(comma, 2);
}
success = true;
json_serializer.addElement(current_element);
}
else if (status == VisitorStatus::Error)
{
@ -436,7 +437,8 @@ public:
/// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6),
/// however this functionality is not implemented yet
}
current_element = root;
//current_element = root;
//result.clear();
}
if (!success)
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <utility>
#include <Functions/JSONPath/Generator/IGenerator.h>
#include <Functions/JSONPath/Generator/VisitorJSONPathMemberAccess.h>
#include <Functions/JSONPath/Generator/VisitorJSONPathRange.h>
@ -21,6 +22,7 @@ template <typename JSONParser>
class GeneratorJSONPath : public IGenerator<JSONParser>
{
public:
using TElement = typename JSONParser::Element;
/**
* Traverses children ASTs of ASTJSONPathQuery and creates a vector of corresponding visitors
* @param query_ptr_ pointer to ASTJSONPathQuery
@ -80,11 +82,6 @@ public:
return VisitorStatus::Exhausted;
}
for (int i = 0; i < current_visitor; ++i)
{
visitors[i]->apply(current);
}
VisitorStatus status = VisitorStatus::Error;
for (size_t i = current_visitor; i < visitors.size(); ++i)
{
@ -105,6 +102,31 @@ public:
}
}
VisitorStatus getNextItemBatch(TElement & element, std::function<void(const TElement &)> & res_func) override
{
while (true)
{
/// element passed to us actually is root, so here we assign current to root
auto current = element;
if (current_visitor < 0)
return VisitorStatus::Exhausted;
VisitorStatus status = VisitorStatus::Error;
size_t visitor_size = visitors.size();
for (size_t i = current_visitor; i < visitor_size; ++i)
{
status = visitors[i]->visitBatch(current, res_func, i == visitor_size - 1);
current_visitor = static_cast<int>(i);
if (status == VisitorStatus::Error || status == VisitorStatus::Ignore)
break;
}
updateVisitorsForNextRun();
if (status != VisitorStatus::Ignore)
return status;
}
}
void reinitialize()
{
while (current_visitor >= 0)

View File

@ -10,6 +10,8 @@ template <typename JSONParser>
class IGenerator
{
public:
using TElement = typename JSONParser::Element;
IGenerator() = default;
virtual const char * getName() const = 0;
@ -20,7 +22,9 @@ public:
* @param element to be extracted into
* @return true if generator is not exhausted
*/
virtual VisitorStatus getNextItem(typename JSONParser::Element & element) = 0;
virtual VisitorStatus getNextItem(TElement & element) = 0;
virtual VisitorStatus getNextItemBatch(TElement & element, std::function<void(const TElement &)> & ) = 0;
virtual ~IGenerator() = default;
};

View File

@ -8,6 +8,7 @@ template <typename JSONParser>
class IVisitor
{
public:
using TElement = typename JSONParser::Element;
virtual const char * getName() const = 0;
/**
@ -15,12 +16,13 @@ public:
* @param element simdjson element
*/
virtual VisitorStatus visit(typename JSONParser::Element & element) = 0;
virtual VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & , bool ) = 0;
/**
* Applies this visitor to document, but does not mutate state
* @param element simdjson element
*/
virtual VisitorStatus apply(typename JSONParser::Element & element) const = 0;
virtual VisitorStatus apply(typename JSONParser::Element & element) = 0;
/**
* Restores visitor's initial state for later use

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <Functions/JSONPath/ASTs/ASTJSONPathMemberAccess.h>
#include <Functions/JSONPath/Generator/IVisitor.h>
#include <Functions/JSONPath/Generator/VisitorStatus.h>
@ -10,33 +11,51 @@ template <typename JSONParser>
class VisitorJSONPathMemberAccess : public IVisitor<JSONParser>
{
public:
using TElement = JSONParser::Element;
explicit VisitorJSONPathMemberAccess(ASTPtr member_access_ptr_)
: member_access_ptr(member_access_ptr_->as<ASTJSONPathMemberAccess>()) { }
const char * getName() const override { return "VisitorJSONPathMemberAccess"; }
VisitorStatus apply(typename JSONParser::Element & element) const override
VisitorStatus apply(typename JSONParser::Element & element) override
{
typename JSONParser::Element result;
element.getObject().find(std::string_view(member_access_ptr->member_name), result);
element = result;
auto obj = element.getObject();
if (!obj.find(std::string_view(member_access_ptr->member_name), result))
{
return VisitorStatus::Error;
}
element = std::move(result);
return VisitorStatus::Ok;
}
VisitorStatus visit(typename JSONParser::Element & element) override
{
if (this->isExhausted())
{
return VisitorStatus::Exhausted;
}
this->setExhausted(true);
if (!element.isObject())
{
return VisitorStatus::Error;
}
typename JSONParser::Element result;
if (!element.getObject().find(std::string_view(member_access_ptr->member_name), result))
{
return VisitorStatus::Error;
return apply(element);
}
apply(element);
return VisitorStatus::Ok;
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
{
if (this->isExhausted())
return VisitorStatus::Exhausted;
this->setExhausted(true);
if (!element.isObject())
return VisitorStatus::Error;
auto status = apply(element);
if (status == VisitorStatus::Ok && can_reduce)
res_func(element);
return status;
}
void reinitialize() override { this->setExhausted(false); }

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <Functions/JSONPath/ASTs/ASTJSONPathRange.h>
#include <Functions/JSONPath/Generator/IVisitor.h>
#include <Functions/JSONPath/Generator/VisitorStatus.h>
@ -18,23 +19,80 @@ public:
const char * getName() const override { return "VisitorJSONPathRange"; }
VisitorStatus apply(typename JSONParser::Element & element) const override
VisitorStatus apply(typename JSONParser::Element & element) override
{
typename JSONParser::Array array = element.getArray();
element = array[current_index];
element = (*array)[current_index];
return VisitorStatus::Ok;
}
using TElement = JSONParser::Element;
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
{
if (!array && !element.isArray())
{
this->setExhausted(true);
return VisitorStatus::Error;
}
VisitorStatus status = VisitorStatus::Ok;
if (!array)
{
current_element = element;
array = current_element.getArray();
if (!can_reduce)
array_size = array.value().size();
}
if (can_reduce)
{
std::set<size_t> index_set{};
for (auto range: range_ptr->ranges)
for (size_t i = range.first ; i < range.second; ++i)
index_set.insert(i);
size_t idx = 0;
for (auto item: array.value())
if (index_set.find(idx++) != index_set.end())
res_func(item);
this->setExhausted(true);
status = VisitorStatus::Ok;
}
else
{
if (current_index < array_size.value())
{
apply(element);
status = VisitorStatus::Ok;
}
else
status = VisitorStatus::Ignore;
if (current_index + 1 == range_ptr->ranges[current_range].second
&& current_range + 1 == range_ptr->ranges.size())
{
this->setExhausted(true);
}
}
return status;
}
VisitorStatus visit(typename JSONParser::Element & element) override
{
if (!element.isArray())
if (!array && !element.isArray())
{
this->setExhausted(true);
return VisitorStatus::Error;
}
VisitorStatus status;
if (current_index < element.getArray().size())
if (!array)
{
current_element = element;
array = current_element.getArray();
array_size = array.value().size();
}
if (current_index < array_size.value())
{
apply(element);
status = VisitorStatus::Ok;
@ -58,6 +116,8 @@ public:
current_range = 0;
current_index = range_ptr->ranges[current_range].first;
this->setExhausted(false);
array_size.reset();
array.reset();
}
void updateState() override
@ -74,6 +134,9 @@ private:
ASTJSONPathRange * range_ptr;
size_t current_range;
UInt32 current_index;
std::optional<size_t> array_size{};
std::optional<typename JSONParser::Array> array{};
typename JSONParser::Element current_element{};
};
}

View File

@ -10,11 +10,12 @@ template <typename JSONParser>
class VisitorJSONPathRoot : public IVisitor<JSONParser>
{
public:
using TElement = JSONParser::Element;
explicit VisitorJSONPathRoot(ASTPtr) { }
const char * getName() const override { return "VisitorJSONPathRoot"; }
VisitorStatus apply(typename JSONParser::Element & /*element*/) const override
VisitorStatus apply(typename JSONParser::Element & /*element*/) override
{
/// No-op on document, since we are already passed document's root
return VisitorStatus::Ok;
@ -27,9 +28,19 @@ public:
return VisitorStatus::Ok;
}
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
{
apply(element);
this->setExhausted(true);
if (can_reduce)
res_func(element);
return VisitorStatus::Ok;
}
void reinitialize() override { this->setExhausted(false); }
void updateState() override { }
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <Functions/JSONPath/ASTs/ASTJSONPathStar.h>
#include <Functions/JSONPath/Generator/IVisitor.h>
#include <Functions/JSONPath/Generator/VisitorStatus.h>
@ -17,23 +18,27 @@ public:
const char * getName() const override { return "VisitorJSONPathStar"; }
VisitorStatus apply(typename JSONParser::Element & element) const override
VisitorStatus apply(typename JSONParser::Element & element) override
{
typename JSONParser::Array array = element.getArray();
element = array[current_index];
element = array.value()[current_index];
return VisitorStatus::Ok;
}
VisitorStatus visit(typename JSONParser::Element & element) override
{
if (!element.isArray())
if (!array && !element.isArray())
{
this->setExhausted(true);
return VisitorStatus::Error;
}
if (!array_size)
{
array = element.getArray();
array_size = array.value().size();
}
VisitorStatus status;
if (current_index < element.getArray().size())
if (current_index < array_size.value())
{
apply(element);
status = VisitorStatus::Ok;
@ -46,11 +51,53 @@ public:
return status;
}
using TElement = JSONParser::Element;
VisitorStatus visitBatch(TElement & element, std::function<void(const TElement &)> & res_func, bool can_reduce) override
{
if (!array && !element.isArray())
{
this->setExhausted(true);
return VisitorStatus::Error;
}
if (!array)
{
array = element.getArray();
array_size = array.value().size();
}
VisitorStatus status = VisitorStatus::Ok;
if (can_reduce)
{
for (auto item: array.value())
res_func(item);
this->setExhausted(true);
}
else
{
if (current_index < array_size.value())
{
apply(element);
status = VisitorStatus::Ok;
}
else
{
status = VisitorStatus::Ignore;
this->setExhausted(true);
}
}
return status;
}
void reinitialize() override
{
current_index = 0;
this->setExhausted(false);
array_size.reset();
array.reset();
}
void updateState() override
@ -59,7 +106,9 @@ public:
}
private:
UInt32 current_index;
UInt32 current_index{};
std::optional<typename JSONParser::Array> array{};
std::optional<size_t> array_size{};
};
}