#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" namespace DB { namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; extern const int BAD_ARGUMENTS; } /// Have implemented the operator << for json elements. So we could use stringstream to serialize json elements. /// But stingstream have bad performance, not recommend to use it. template class DefaultJSONStringSerializer { public: explicit DefaultJSONStringSerializer(ColumnString & col_str_) : col_str(col_str_) { } inline void addRawData(const char * ptr, size_t len) { out << std::string_view(ptr, len); } inline void addRawString(std::string_view str) { out << str; } /// serialize the json element into stringstream inline void addElement(const Element & element) { out << element.getElement(); } inline void commit() { auto out_str = out.str(); col_str.insertData(out_str.data(), out_str.size()); } inline void rollback() {} private: ColumnString & col_str; std::stringstream out; // STYLE_CHECK_ALLOW_STD_STRING_STREAM }; /// A more efficient way to serialize json elements into destination column. /// Formatter takes the chars buffer in the ColumnString and put data into it directly. template class JSONStringSerializer { public: explicit JSONStringSerializer(ColumnString & col_str_) : col_str(col_str_), chars(col_str_.getChars()), offsets(col_str_.getOffsets()), formatter(col_str_.getChars()) { prev_offset = offsets.empty() ? 0 : offsets.back(); } /// Put the data into column's buffer directly. inline void addRawData(const char * ptr, size_t len) { chars.insert(ptr, ptr + len); } inline void addRawString(std::string_view str) { chars.insert(str.data(), str.data() + str.size()); } /// serialize the json element into column's buffer directly inline void addElement(const Element & element) { formatter.append(element.getElement()); } inline void commit() { chars.push_back(0); offsets.push_back(chars.size()); } inline void rollback() { chars.resize(prev_offset); } private: ColumnString & col_str; ColumnString::Chars & chars; IColumn::Offsets & offsets; Formatter formatter; size_t prev_offset; }; class FunctionSQLJSONHelpers { public: template class Executor { public: static ColumnPtr run(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, uint32_t parse_depth, uint32_t parse_backtracks, const ContextPtr & context) { MutableColumnPtr to{result_type->createColumn()}; to->reserve(input_rows_count); if (arguments.size() < 2) { throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "JSONPath functions require at least 2 arguments"); } const auto & json_column = arguments[0]; if (!isString(json_column.type)) { throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "JSONPath functions require first argument to be JSON of string, illegal type: {}", json_column.type->getName()); } const auto & json_path_column = arguments[1]; if (!isString(json_path_column.type)) { throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "JSONPath functions require second argument " "to be JSONPath of type string, illegal type: {}", json_path_column.type->getName()); } if (!isColumnConst(*json_path_column.column)) { throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Second argument (JSONPath) must be constant string"); } /// Prepare to parse 1 argument (JSONPath) String query = typeid_cast(*json_path_column.column).getValue(); /// Tokenize the query Tokens tokens(query.data(), query.data() + query.size()); /// Max depth 0 indicates that depth is not limited IParser::Pos token_iterator(tokens, parse_depth, parse_backtracks); /// Parse query and create AST tree Expected expected; ASTPtr res; ParserJSONPath parser; const bool parse_res = parser.parse(token_iterator, res, expected); if (!parse_res) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unable to parse JSONPath"); } JSONParser json_parser; using Element = typename JSONParser::Element; Element document; bool document_ok = false; /// Parse JSON for every row Impl impl; GeneratorJSONPath generator_json_path(res); for (size_t i = 0; i < input_rows_count; ++i) { std::string_view json = json_column.column->getDataAt(i).toView(); document_ok = json_parser.parse(json, document); bool added_to_column = false; if (document_ok) { /// Instead of creating a new generator for each row, we can reuse the same one. generator_json_path.reinitialize(); added_to_column = impl.insertResultToColumn(*to, document, generator_json_path, context); } if (!added_to_column) { to->insertDefault(); } } return to; } }; }; template typename Impl> class FunctionSQLJSON : public IFunction, WithConstContext { public: static FunctionPtr create(ContextPtr context_) { return std::make_shared(context_); } explicit FunctionSQLJSON(ContextPtr context_) : WithConstContext(context_) { } static constexpr auto name = Name::name; String getName() const override { return Name::name; } bool isVariadic() const override { return true; } size_t getNumberOfArguments() const override { return 0; } bool useDefaultImplementationForConstants() const override { return true; } ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; } bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; } DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override { return Impl>::getReturnType( Name::name, arguments, getContext()); } ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override { /// Choose JSONParser. /// 1. Lexer(path) -> Tokens /// 2. Create ASTPtr /// 3. Parser(Tokens, ASTPtr) -> complete AST /// 4. Execute functions: call getNextItem on generator and handle each item unsigned parse_depth = static_cast(getContext()->getSettingsRef().max_parser_depth); unsigned parse_backtracks = static_cast(getContext()->getSettingsRef().max_parser_backtracks); #if USE_SIMDJSON if (getContext()->getSettingsRef().allow_simdjson) return FunctionSQLJSONHelpers::Executor< Name, Impl>, SimdJSONParser>::run(arguments, result_type, input_rows_count, parse_depth, parse_backtracks, getContext()); #endif return FunctionSQLJSONHelpers:: Executor>, DummyJSONParser>::run( arguments, result_type, input_rows_count, parse_depth, parse_backtracks, getContext()); } }; struct NameJSONExists { static constexpr auto name{"JSON_EXISTS"}; }; struct NameJSONValue { static constexpr auto name{"JSON_VALUE"}; }; struct NameJSONQuery { static constexpr auto name{"JSON_QUERY"}; }; template class JSONExistsImpl { public: using Element = typename JSONParser::Element; static DataTypePtr getReturnType(const char *, const ColumnsWithTypeAndName &, const ContextPtr &) { return std::make_shared(); } static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; } static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath & generator_json_path, const ContextPtr &) { Element current_element = root; VisitorStatus status; while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted) { if (status == VisitorStatus::Ok) { break; } current_element = root; } /// insert result, status can be either Ok (if we found the item) /// or Exhausted (if we never found the item) ColumnUInt8 & col_bool = assert_cast(dest); if (status == VisitorStatus::Ok) { col_bool.insert(1); } else { col_bool.insert(0); } return true; } }; template class JSONValueImpl { public: using Element = typename JSONParser::Element; static DataTypePtr getReturnType(const char *, const ColumnsWithTypeAndName &, const ContextPtr & context) { if (context->getSettingsRef().function_json_value_return_type_allow_nullable) { DataTypePtr string_type = std::make_shared(); return std::make_shared(string_type); } else { return std::make_shared(); } } static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; } static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath & generator_json_path, const ContextPtr & context) { Element current_element = root; VisitorStatus status; while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted) { if (status == VisitorStatus::Ok) { if (context->getSettingsRef().function_json_value_return_type_allow_complex) { break; } else if (!(current_element.isArray() || current_element.isObject())) { break; } } else if (status == VisitorStatus::Error) { /// ON ERROR /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } current_element = root; } if (status == VisitorStatus::Exhausted) return false; ColumnString * col_str = nullptr; if (isColumnNullable(dest)) { ColumnNullable & col_null = assert_cast(dest); col_null.getNullMapData().push_back(0); col_str = assert_cast(&col_null.getNestedColumn()); } else { col_str = assert_cast(&dest); } JSONStringSerializer json_serializer(*col_str); if (current_element.isString()) { auto str = current_element.getString(); json_serializer.addRawString(str); } else json_serializer.addElement(current_element); json_serializer.commit(); return true; } }; /** * Function to test jsonpath member access, will be removed in final PR * @tparam JSONParser parser */ template class JSONQueryImpl { public: using Element = typename JSONParser::Element; static DataTypePtr getReturnType(const char *, const ColumnsWithTypeAndName &, const ContextPtr &) { return std::make_shared(); } static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; } static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath & generator_json_path, const ContextPtr &) { ColumnString & col_str = assert_cast(dest); Element current_element = root; VisitorStatus status; bool success = false; const char * array_begin = "["; const char * array_end = "]"; const char * comma = ", "; JSONStringSerializer json_serializer(col_str); json_serializer.addRawData(array_begin, 1); while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted) { if (status == VisitorStatus::Ok) { if (success) { json_serializer.addRawData(comma, 2); } success = true; json_serializer.addElement(current_element); } else if (status == VisitorStatus::Error) { /// ON ERROR /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } current_element = root; } if (!success) { json_serializer.rollback(); return false; } json_serializer.addRawData(array_end, 1); json_serializer.commit(); return true; } }; }