dynamic subcolumns: new format and several fixes

This commit is contained in:
Anton Popov 2021-06-08 12:33:04 +03:00
parent 205a23282b
commit 6a5daca135
8 changed files with 122 additions and 51 deletions

View File

@ -7,7 +7,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/castColumn.h>
#include <Common/FieldVisitors.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -169,6 +171,17 @@ Names ColumnObject::getKeys() const
return keys;
}
static bool isPrefix(const Strings & prefix, const Strings & strings)
{
if (prefix.size() > strings.size())
return false;
for (size_t i = 0; i < prefix.size(); ++i)
if (prefix[i] != strings[i])
return false;
return true;
}
void ColumnObject::optimizeTypesOfSubcolumns()
{
if (optimized_types_of_subcolumns)
@ -184,11 +197,20 @@ void ColumnObject::optimizeTypesOfSubcolumns()
if (isNothing(getBaseTypeOfArray(to_type)))
continue;
auto it = std::find_if(subcolumns.begin(), subcolumns.end(),
[&name = name](const auto & elem) { return elem.first.size() > name.size() && startsWith(elem.first, name); });
Strings name_parts;
boost::split(name_parts, name, boost::is_any_of("."));
if (it != subcolumns.end())
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Data in Object has ambiguous paths: '{}' and '{}", name, it->first);
for (const auto & [other_name, _] : subcolumns)
{
if (other_name.size() > name.size())
{
Strings other_name_parts;
boost::split(other_name_parts, other_name, boost::is_any_of("."));
if (isPrefix(name_parts, other_name_parts))
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Data in Object has ambiguous paths: '{}' and '{}", name, other_name);
}
}
if (to_type->equals(*from_type))
{

View File

@ -151,7 +151,7 @@ double IColumn::getRatioOfDefaultRowsImpl(double sample_ratio) const
return 0.0;
size_t step = num_rows / num_sampled_rows;
std::uniform_int_distribution<size_t> dist(1, step);
std::uniform_int_distribution<size_t> dist(0, step - 1);
size_t res = 0;
for (size_t i = 0; i < num_rows; i += step)

View File

@ -109,7 +109,7 @@ DataTypePtr FieldToDataType::operator() (const Array & x) const
element_types.reserve(x.size());
for (const Field & elem : x)
element_types.emplace_back(applyVisitor(FieldToDataType(), elem));
element_types.emplace_back(applyVisitor(FieldToDataType(allow_convertion_to_string), elem));
return std::make_shared<DataTypeArray>(getLeastSupertype(element_types, allow_convertion_to_string));
}
@ -124,7 +124,7 @@ DataTypePtr FieldToDataType::operator() (const Tuple & tuple) const
element_types.reserve(ext::size(tuple));
for (const auto & element : tuple)
element_types.push_back(applyVisitor(FieldToDataType(), element));
element_types.push_back(applyVisitor(FieldToDataType(allow_convertion_to_string), element));
return std::make_shared<DataTypeTuple>(element_types);
}
@ -140,8 +140,8 @@ DataTypePtr FieldToDataType::operator() (const Map & map) const
{
const auto & tuple = elem.safeGet<const Tuple &>();
assert(tuple.size() == 2);
key_types.push_back(applyVisitor(FieldToDataType(), tuple[0]));
value_types.push_back(applyVisitor(FieldToDataType(), tuple[1]));
key_types.push_back(applyVisitor(FieldToDataType(allow_convertion_to_string), tuple[0]));
value_types.push_back(applyVisitor(FieldToDataType(allow_convertion_to_string), tuple[1]));
}
return std::make_shared<DataTypeMap>(

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
namespace
{
class FieldVisitorReplaceNull : public StaticVisitor<Field>
{
public:

View File

@ -15,6 +15,7 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory);
/// Formats for both input/output.
@ -76,6 +77,7 @@ void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorJSONAsObject(FormatFactory & factory);
void registerInputFormatProcessorLineAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
@ -89,6 +91,7 @@ void registerFormats()
registerFileSegmentationEngineJSONEachRow(factory);
registerFileSegmentationEngineRegexp(factory);
registerFileSegmentationEngineJSONAsString(factory);
registerFileSegmentationEngineJSONAsObject(factory);
registerInputFormatNative(factory);
registerOutputFormatNative(factory);
@ -147,6 +150,7 @@ void registerFormats()
registerInputFormatProcessorRegexp(factory);
registerInputFormatProcessorJSONAsString(factory);
registerInputFormatProcessorJSONAsObject(factory);
registerInputFormatProcessorLineAsString(factory);
#if !defined(ARCADIA_BUILD)

View File

@ -452,7 +452,7 @@ void getArrayJoinedColumns(ASTPtr & query, TreeRewriterResult & result, const AS
bool found = false;
for (const auto & column : source_columns)
{
auto split = Nested::splitName(column.name);
auto split = Nested::splitName(column.name, /*reverse=*/ true);
if (split.first == source_name && !split.second.empty())
{
result.array_join_result_to_source[Nested::concatenateName(result_name, split.second)] = column.name;

View File

@ -0,0 +1,84 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Formats/FormatFactory.h>
#include <Formats/JSONEachRowUtils.h>
#include <Common/assert_cast.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
class JSONAsObjectRowInputFormat : public IRowInputFormat
{
public:
JSONAsObjectRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "JSONAsObjectRowInputFormat"; }
private:
const FormatSettings format_settings;
};
JSONAsObjectRowInputFormat::JSONAsObjectRowInputFormat(
ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, std::move(params_))
, format_settings(format_settings_)
{
if (header_.columns() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Input format JSONAsObject is only suitable for tables with a single column of type Object but the number of columns is {}",
header_.columns());
if (!isObject(header_.getByPosition(0).type))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Input format JSONAsObject is only suitable for tables with a single column of type Object but the column type is {}",
header_.getByPosition(0).type->getName());
}
bool JSONAsObjectRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
assert(serializations.size() == 1);
assert(columns.size() == 1);
skipWhitespaceIfAny(in);
if (!in.eof())
serializations[0]->deserializeTextJSON(*columns[0], in, format_settings);
skipWhitespaceIfAny(in);
if (!in.eof() && *in.position() == ',')
++in.position();
skipWhitespaceIfAny(in);
return !in.eof();
}
}
void registerInputFormatProcessorJSONAsObject(FormatFactory & factory)
{
factory.registerInputFormatProcessor("JSONAsObject", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONAsObjectRowInputFormat>(buf, sample, std::move(params), settings);
});
}
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory)
{
factory.registerFileSegmentationEngine("JSONAsObject", &fileSegmentationEngineJSONEachRowImpl);
}
}

View File

@ -1,40 +0,0 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json"
$CLICKHOUSE_CLIENT -q "CREATE TABLE t_json(id UInt64, data Object('JSON')) \
ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0"
cat << EOF | $CLICKHOUSE_CLIENT -q "INSERT INTO t_json FORMAT CSV"
1,{"k1":"aa","k2":{"k3":"bb","k4":"c"}}
2,{"k1":"ee","k5":"ff"}
EOF
cat << EOF | $CLICKHOUSE_CLIENT -q "INSERT INTO t_json FORMAT CSV"
3,{"k5":"foo"}
EOF
$CLICKHOUSE_CLIENT -q "SELECT id, data.k1, data.k2.k3, data.k2.k4, data.k5 FROM t_json ORDER BY id"
$CLICKHOUSE_CLIENT -q "SELECT name, column, type \
FROM system.parts_columns WHERE table = 't_json' AND database = '$CLICKHOUSE_DATABASE' ORDER BY name"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json"
$CLICKHOUSE_CLIENT -q "CREATE TABLE t_json(id UInt64, data Object('JSON')) \
ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0"
cat << EOF | $CLICKHOUSE_CLIENT -q "INSERT INTO t_json FORMAT CSV"
1,{"k1":[{"k2":"aaa","k3":[{"k4":"bbb"},{"k4":"ccc"}]},{"k2":"ddd","k3":[{"k4":"eee"},{"k4":"fff"}]}]}
EOF
$CLICKHOUSE_CLIENT -q "SELECT id, data.k1.k2, data.k1.k3.k4 FROM t_json ORDER BY id"
$CLICKHOUSE_CLIENT -q "SELECT name, column, type \
FROM system.parts_columns WHERE table = 't_json' AND database = '$CLICKHOUSE_DATABASE' ORDER BY name"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_json"