Merge pull request #35409 from CurtizJ/dynamic-columns-3

Fix race in data type `Object`
This commit is contained in:
Anton Popov 2022-03-19 22:18:50 +01:00 committed by GitHub
commit ed67ba14b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 123 additions and 63 deletions

View File

@ -253,6 +253,29 @@ void ColumnObject::Subcolumn::insert(Field field)
insert(std::move(field), std::move(info));
}
void ColumnObject::Subcolumn::addNewColumnPart(DataTypePtr type)
{
auto serialization = type->getSerialization(ISerialization::Kind::SPARSE);
data.push_back(type->createColumn(*serialization));
least_common_type = LeastCommonType{std::move(type)};
}
static bool isConversionRequiredBetweenIntegers(const IDataType & lhs, const IDataType & rhs)
{
/// If both of types are signed/unsigned integers and size of left field type
/// is less than right type, we don't need to convert field,
/// because all integer fields are stored in Int64/UInt64.
WhichDataType which_lhs(lhs);
WhichDataType which_rhs(rhs);
bool is_native_int = which_lhs.isNativeInt() && which_rhs.isNativeInt();
bool is_native_uint = which_lhs.isNativeUInt() && which_rhs.isNativeUInt();
return (is_native_int || is_native_uint)
&& lhs.getSizeOfValueInMemory() <= rhs.getSizeOfValueInMemory();
}
void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
{
auto base_type = std::move(info.scalar_type);
@ -263,10 +286,10 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
return;
}
auto column_dim = getNumberOfDimensions(*least_common_type);
auto column_dim = least_common_type.getNumberOfDimensions();
auto value_dim = info.num_dimensions;
if (isNothing(least_common_type))
if (isNothing(least_common_type.get()))
column_dim = value_dim;
if (field.isNull())
@ -284,29 +307,26 @@ void ColumnObject::Subcolumn::insert(Field field, FieldInfo info)
if (!is_nullable && info.have_nulls)
field = applyVisitor(FieldVisitorReplaceNull(base_type->getDefault(), value_dim), std::move(field));
auto value_type = createArrayOfType(base_type, value_dim);
bool type_changed = false;
const auto & least_common_base_type = least_common_type.getBase();
if (data.empty())
{
auto serialization = value_type->getSerialization(ISerialization::Kind::SPARSE);
data.push_back(value_type->createColumn(*serialization));
least_common_type = value_type;
addNewColumnPart(createArrayOfType(std::move(base_type), value_dim));
}
else if (!least_common_type->equals(*value_type))
else if (!least_common_base_type->equals(*base_type) && !isNothing(base_type))
{
value_type = getLeastSupertype(DataTypes{value_type, least_common_type}, true);
type_changed = true;
if (!least_common_type->equals(*value_type))
if (!isConversionRequiredBetweenIntegers(*base_type, *least_common_base_type))
{
auto serialization = value_type->getSerialization(ISerialization::Kind::SPARSE);
data.push_back(value_type->createColumn(*serialization));
least_common_type = value_type;
base_type = getLeastSupertype(DataTypes{std::move(base_type), least_common_base_type}, true);
type_changed = true;
if (!least_common_base_type->equals(*base_type))
addNewColumnPart(createArrayOfType(std::move(base_type), value_dim));
}
}
if (type_changed || info.need_convert)
field = convertFieldToTypeOrThrow(field, *value_type);
field = convertFieldToTypeOrThrow(field, *least_common_type.get());
data.back()->insert(field);
}
@ -316,28 +336,24 @@ void ColumnObject::Subcolumn::insertRangeFrom(const Subcolumn & src, size_t star
assert(src.isFinalized());
const auto & src_column = src.data.back();
const auto & src_type = src.least_common_type;
const auto & src_type = src.least_common_type.get();
if (data.empty())
{
least_common_type = src_type;
data.push_back(src_type->createColumn());
addNewColumnPart(src.least_common_type.get());
data.back()->insertRangeFrom(*src_column, start, length);
}
else if (least_common_type->equals(*src_type))
else if (least_common_type.get()->equals(*src_type))
{
data.back()->insertRangeFrom(*src_column, start, length);
}
else
{
auto new_least_common_type = getLeastSupertype(DataTypes{least_common_type, src_type}, true);
auto new_least_common_type = getLeastSupertype(DataTypes{least_common_type.get(), src_type}, true);
auto casted_column = castColumn({src_column, src_type, ""}, new_least_common_type);
if (!least_common_type->equals(*new_least_common_type))
{
least_common_type = new_least_common_type;
data.push_back(least_common_type->createColumn());
}
if (!least_common_type.get()->equals(*new_least_common_type))
addNewColumnPart(std::move(new_least_common_type));
data.back()->insertRangeFrom(*casted_column, start, length);
}
@ -360,7 +376,7 @@ void ColumnObject::Subcolumn::finalize()
return;
}
const auto & to_type = least_common_type;
const auto & to_type = least_common_type.get();
auto result_column = to_type->createColumn();
if (num_of_defaults_in_prefix)
@ -462,7 +478,7 @@ ColumnObject::Subcolumn ColumnObject::Subcolumn::recreateWithDefaultValues(const
scalar_type = makeNullable(scalar_type);
Subcolumn new_subcolumn;
new_subcolumn.least_common_type = createArrayOfType(scalar_type, field_info.num_dimensions);
new_subcolumn.least_common_type = LeastCommonType{createArrayOfType(scalar_type, field_info.num_dimensions)};
new_subcolumn.is_nullable = is_nullable;
new_subcolumn.num_of_defaults_in_prefix = num_of_defaults_in_prefix;
new_subcolumn.data.reserve(data.size());
@ -492,6 +508,13 @@ const ColumnPtr & ColumnObject::Subcolumn::getFinalizedColumnPtr() const
return data[0];
}
ColumnObject::Subcolumn::LeastCommonType::LeastCommonType(DataTypePtr type_)
: type(std::move(type_))
, base_type(getBaseTypeOfArray(type))
, num_dimensions(DB::getNumberOfDimensions(*type))
{
}
ColumnObject::ColumnObject(bool is_nullable_)
: is_nullable(is_nullable_)
, num_rows(0)

View File

@ -67,7 +67,7 @@ public:
size_t allocatedBytes() const;
bool isFinalized() const;
const DataTypePtr & getLeastCommonType() const { return least_common_type; }
const DataTypePtr & getLeastCommonType() const { return least_common_type.get(); }
/// Checks the consistency of column's parts stored in @data.
void checkTypes() const;
@ -102,8 +102,26 @@ public:
friend class ColumnObject;
private:
class LeastCommonType
{
public:
LeastCommonType() = default;
explicit LeastCommonType(DataTypePtr type_);
const DataTypePtr & get() const { return type; }
const DataTypePtr & getBase() const { return base_type; }
size_t getNumberOfDimensions() const { return num_dimensions; }
private:
DataTypePtr type;
DataTypePtr base_type;
size_t num_dimensions = 0;
};
void addNewColumnPart(DataTypePtr type);
/// Current least common type of all values inserted to this subcolumn.
DataTypePtr least_common_type;
LeastCommonType least_common_type;
/// If true then common type type of subcolumn is Nullable
/// and default values are NULLs.

View File

@ -19,7 +19,6 @@ namespace ErrorCodes
DataTypeObject::DataTypeObject(const String & schema_format_, bool is_nullable_)
: schema_format(Poco::toLower(schema_format_))
, is_nullable(is_nullable_)
, default_serialization(getObjectSerialization(schema_format))
{
}
@ -32,7 +31,7 @@ bool DataTypeObject::equals(const IDataType & rhs) const
SerializationPtr DataTypeObject::doGetDefaultSerialization() const
{
return default_serialization;
return getObjectSerialization(schema_format);
}
String DataTypeObject::doGetName() const

View File

@ -18,7 +18,6 @@ class DataTypeObject : public IDataType
private:
String schema_format;
bool is_nullable;
SerializationPtr default_serialization;
public:
DataTypeObject(const String & schema_format_, bool is_nullable_);

View File

@ -35,7 +35,7 @@ class JSONDataParser
public:
using Element = typename ParserImpl::Element;
void readJSON(String & s, ReadBuffer & buf)
static void readJSON(String & s, ReadBuffer & buf)
{
readJSONObjectPossiblyInvalid(s, buf);
}

View File

@ -36,15 +36,15 @@ PathInData::PathInData(std::string_view path_)
}
PathInData::PathInData(const Parts & parts_)
: path(buildPath(parts_))
, parts(buildParts(path, parts_))
{
buildPath(parts_);
buildParts(parts_);
}
PathInData::PathInData(const PathInData & other)
: path(other.path)
, parts(buildParts(path, other.getParts()))
{
buildParts(other.getParts());
}
PathInData & PathInData::operator=(const PathInData & other)
@ -52,7 +52,7 @@ PathInData & PathInData::operator=(const PathInData & other)
if (this != &other)
{
path = other.path;
parts = buildParts(path, other.parts);
buildParts(other.parts);
}
return *this;
}
@ -79,8 +79,8 @@ void PathInData::writeBinary(WriteBuffer & out) const
for (const auto & part : parts)
{
writeStringBinary(part.key, out);
writeVarUInt(part.is_nested, out);
writeVarUInt(part.anonymous_array_level, out);
writeIntBinary(part.is_nested, out);
writeIntBinary(part.anonymous_array_level, out);
}
}
@ -99,48 +99,47 @@ void PathInData::readBinary(ReadBuffer & in)
UInt8 anonymous_array_level;
auto ref = readStringBinaryInto(arena, in);
readVarUInt(is_nested, in);
readVarUInt(anonymous_array_level, in);
readIntBinary(is_nested, in);
readIntBinary(anonymous_array_level, in);
temp_parts.emplace_back(static_cast<std::string_view>(ref), is_nested, anonymous_array_level);
}
/// Recreate path and parts.
path = buildPath(temp_parts);
parts = buildParts(path, temp_parts);
buildPath(temp_parts);
buildParts(temp_parts);
}
String PathInData::buildPath(const Parts & other_parts)
void PathInData::buildPath(const Parts & other_parts)
{
if (other_parts.empty())
return "";
return;
String res;
path.clear();
auto it = other_parts.begin();
res += it->key;
path += it->key;
++it;
for (; it != other_parts.end(); ++it)
{
res += ".";
res += it->key;
path += ".";
path += it->key;
}
return res;
}
PathInData::Parts PathInData::buildParts(const String & other_path, const Parts & other_parts)
void PathInData::buildParts(const Parts & other_parts)
{
if (other_parts.empty())
return {};
return;
Parts res;
const char * begin = other_path.data();
parts.clear();
parts.reserve(other_parts.size());
const char * begin = path.data();
for (const auto & part : other_parts)
{
res.emplace_back(std::string_view{begin, part.key.length()}, part.is_nested, part.anonymous_array_level);
has_nested |= part.is_nested;
parts.emplace_back(std::string_view{begin, part.key.length()}, part.is_nested, part.anonymous_array_level);
begin += part.key.length() + 1;
}
return res;
}
size_t PathInData::Hash::operator()(const PathInData & value) const

View File

@ -55,7 +55,7 @@ public:
const Parts & getParts() const { return parts; }
bool isNested(size_t i) const { return parts[i].is_nested; }
bool hasNested() const { return std::any_of(parts.begin(), parts.end(), [](const auto & part) { return part.is_nested; }); }
bool hasNested() const { return has_nested; }
void writeBinary(WriteBuffer & out) const;
void readBinary(ReadBuffer & in);
@ -65,16 +65,20 @@ public:
private:
/// Creates full path from parts.
static String buildPath(const Parts & other_parts);
void buildPath(const Parts & other_parts);
/// Creates new parts full from full path with correct string pointers.
static Parts buildParts(const String & other_path, const Parts & other_parts);
void buildParts(const Parts & other_parts);
/// The full path. Parts are separated by dots.
String path;
/// Parts of the path. All string_view-s in parts must point to the @path.
Parts parts;
/// True if at least one part is nested.
/// Cached to avoid linear complexity at 'hasNested'.
bool has_nested = false;
};
class PathInDataBuilder

View File

@ -68,7 +68,7 @@ using Node = typename ColumnObject::SubcolumnsTree::Node;
/// Finds a subcolumn from the same Nested type as @entry and inserts
/// an array with default values with consistent sizes as in Nested type.
bool tryInsertDefaultFromNested(
std::shared_ptr<Node> entry, const ColumnObject::SubcolumnsTree & subcolumns)
const std::shared_ptr<Node> & entry, const ColumnObject::SubcolumnsTree & subcolumns)
{
if (!entry->path.hasNested())
return false;
@ -134,8 +134,13 @@ void SerializationObject<Parser>::deserializeTextImpl(IColumn & column, Reader &
String buf;
reader(buf);
std::optional<ParseResult> result;
{
auto parser = parsers_pool.get([] { return new Parser; });
result = parser->parse(buf.data(), buf.size());
}
auto result = parser.parse(buf.data(), buf.size());
if (!result)
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse object");
@ -205,7 +210,7 @@ void SerializationObject<Parser>::deserializeTextQuoted(IColumn & column, ReadBu
template <typename Parser>
void SerializationObject<Parser>::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
deserializeTextImpl(column, [&](String & s) { parser.readJSON(s, istr); });
deserializeTextImpl(column, [&](String & s) { Parser::readJSON(s, istr); });
}
template <typename Parser>

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataTypes/Serializations/SimpleTextSerialization.h>
#include <Common/ObjectPool.h>
namespace DB
{
@ -65,7 +66,8 @@ private:
void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const;
mutable Parser parser;
/// Pool of parser objects to make SerializationObject thread safe.
mutable SimpleObjectPool<Parser> parsers_pool;
};
SerializationPtr getObjectSerialization(const String & schema_format);

View File

@ -0,0 +1 @@
Tuple(k1 Int8, k2 String) 3000000

View File

@ -0,0 +1,10 @@
-- Tags: long
DROP TABLE IF EXISTS t_json_parallel;
SET allow_experimental_object_type = 1, max_insert_threads = 20, max_threads = 20;
CREATE TABLE t_json_parallel (data JSON) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO t_json_parallel SELECT materialize('{"k1":1, "k2": "some"}') FROM numbers_mt(3000000);
SELECT any(toTypeName(data)), count() FROM t_json_parallel;
DROP TABLE t_json_parallel;