ColumnSparse: support of functions

This commit is contained in:
Anton Popov 2021-03-31 04:08:27 +03:00
parent 372a1b1fe7
commit 17071471f2
14 changed files with 122 additions and 23 deletions

View File

@ -85,11 +85,14 @@ ColumnPtr ColumnSparse::convertToFullColumnIfSparse() const
size_t offsets_diff = offsets_data[i] - current_offset;
current_offset = offsets_data[i];
if (offsets_diff > 1)
res->insertManyDefaults(offsets_diff - 1);
res->insertManyFrom(*values, 0, offsets_diff - 1);
res->insertFrom(*values, i + 1);
}
res->insertManyDefaults(_size - current_offset);
size_t offsets_diff = _size - current_offset;
if(offsets_diff > 1)
res->insertManyFrom(*values, 0, offsets_diff - 1);
return res;
}
@ -111,6 +114,11 @@ const char * ColumnSparse::deserializeAndInsertFromArena(const char * pos)
throwMustBeDense();
}
const char * ColumnSparse::skipSerializedInArena(const char *) const
{
throwMustBeDense();
}
void ColumnSparse::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
size_t end = start + length;
@ -278,14 +286,10 @@ ColumnPtr ColumnSparse::index(const IColumn & indexes, size_t limit) const
int ColumnSparse::compareAt(size_t n, size_t m, const IColumn & rhs_, int null_direction_hint) const
{
UNUSED(n);
UNUSED(m);
UNUSED(rhs_);
UNUSED(null_direction_hint);
if (const auto * rhs_sparse = typeid_cast<const ColumnSparse *>(&rhs_))
return values->compareAt(getValueIndex(n), rhs_sparse->getValueIndex(m), rhs_sparse->getValuesColumn(), null_direction_hint);
std::cerr << "rhs: " << rhs_.dumpStructure() << "\n";
throwMustBeDense();
return values->compareAt(getValueIndex(n), m, rhs_, null_direction_hint);
}
void ColumnSparse::compareColumn(const IColumn & rhs, size_t rhs_row_num,

View File

@ -51,6 +51,7 @@ public:
return Base::create(std::forward<Arg>(arg));
}
bool isSparse() const override { return true; }
const char * getFamilyName() const override { return "Sparse"; }
std::string getName() const override { return "Sparse(" + values->getName() + ")"; }
TypeIndex getDataType() const override { return values->getDataType(); }
@ -69,6 +70,7 @@ public:
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
const char * skipSerializedInArena(const char *) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;

View File

@ -50,6 +50,9 @@ ConstantFilterDescription::ConstantFilterDescription(const IColumn & column)
FilterDescription::FilterDescription(const IColumn & column_)
{
if (column_.isSparse())
data_holder = column_.convertToFullColumnIfSparse();
if (column_.lowCardinality())
data_holder = column_.convertToFullColumnIfLowCardinality();

View File

@ -460,6 +460,8 @@ public:
virtual bool lowCardinality() const { return false; }
virtual bool isSparse() const { return false; }
virtual bool isCollationSupported() const { return false; }
virtual ~IColumn() = default;

View File

@ -153,7 +153,7 @@ Block NativeBlockInputStream::readImpl()
column.type = data_type_factory.get(type_name);
/// TODO: check revision.
SerializationKind serialization_kind;
ISerialization::Kind serialization_kind;
readIntBinary(serialization_kind, istr);
if (use_index)
@ -167,7 +167,7 @@ Block NativeBlockInputStream::readImpl()
/// Data
ColumnPtr read_column = column.type->createColumn();
if (serialization_kind == SerializationKind::SPARSE)
if (serialization_kind == ISerialization::Kind::SPARSE)
read_column = ColumnSparse::create(read_column);
double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];

View File

@ -119,6 +119,10 @@ void NativeBlockOutputStream::write(const Block & block)
writeStringBinary(type_name, ostr);
std::cerr << "column before: " << column.column->dumpStructure() << "\n";
column.column = column.column->convertToFullColumnIfSparse();
std::cerr << "column after: " << column.column->dumpStructure() << "\n";
/// TODO: add revision
auto serialization = column.type->getSerialization(*column.column);
writeIntBinary(serialization->getKind(), ostr);

View File

@ -14,6 +14,17 @@ namespace ErrorCodes
extern const int MULTIPLE_STREAMS_REQUIRED;
}
String ISerialization::kindToString(Kind kind)
{
switch (kind)
{
case Kind::DEFAULT:
return "Default";
case Kind::SPARSE:
return "Sparse";
}
}
String ISerialization::Substream::toString() const
{
switch (type)

View File

@ -32,18 +32,21 @@ class Field;
struct FormatSettings;
struct NameAndTypePair;
enum class SerializationKind : UInt8
{
DEFAULT = 0,
SPARSE = 1
};
class ISerialization
{
public:
ISerialization() = default;
virtual ~ISerialization() = default;
enum class Kind : UInt8
{
DEFAULT = 0,
SPARSE = 1
};
virtual Kind getKind() const { return Kind::DEFAULT; }
static String kindToString(Kind kind);
/** Binary serialization for range of values in column - for writing to disk/network, etc.
*
* Some data types are represented in multiple streams while being serialized.
@ -103,8 +106,6 @@ public:
String toString() const;
};
virtual SerializationKind getKind() const { return SerializationKind::DEFAULT; }
/// Cache for common substreams of one type, but possible different its subcolumns.
/// E.g. sizes of arrays of Nested data type.
using SubstreamsCache = std::unordered_map<String, ColumnPtr>;

View File

@ -10,7 +10,7 @@ class SerializationSparse final : public SerializationWrapper
public:
SerializationSparse(const SerializationPtr & nested_);
SerializationKind getKind() const override { return SerializationKind::SPARSE; }
Kind getKind() const override { return Kind::SPARSE; }
void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;

View File

@ -8,6 +8,7 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnSparse.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/Native.h>
@ -379,7 +380,13 @@ static void convertLowCardinalityColumnsToFull(ColumnsWithTypeAndName & args)
}
}
ColumnPtr ExecutableFunctionAdaptor::execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
static void convertSparseColumnsToFull(ColumnsWithTypeAndName & args)
{
for (auto & column : args)
column.column = column.column->convertToFullColumnIfSparse();
}
ColumnPtr ExecutableFunctionAdaptor::executeWithoutSparseColumns(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
{
if (impl->useDefaultImplementationForLowCardinalityColumns())
{
@ -452,6 +459,59 @@ ColumnPtr ExecutableFunctionAdaptor::execute(const ColumnsWithTypeAndName & argu
return executeWithoutLowCardinalityColumns(arguments, result_type, input_rows_count, dry_run);
}
ColumnPtr ExecutableFunctionAdaptor::execute(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const
{
if (impl->useDefaultImplementationForSparseColumns())
{
size_t num_sparse_columns = 0;
size_t num_full_columns = 0;
size_t sparse_column_position = 0;
for (size_t i = 0; i < arguments.size(); ++i)
{
if (typeid_cast<const ColumnSparse *>(arguments[i].column.get()))
{
sparse_column_position = i;
++num_sparse_columns;
}
else if (!isColumnConst(*arguments[i].column))
{
++num_full_columns;
}
}
auto columns_without_sparse = arguments;
if (num_sparse_columns == 1 && num_full_columns == 0)
{
auto & arg_with_sparse = columns_without_sparse[sparse_column_position];
ColumnPtr sparse_offsets;
{
/// New scope to avoid possible mistakes on dangling reference.
const auto & column_sparse = assert_cast<const ColumnSparse &>(*arg_with_sparse.column);
sparse_offsets = column_sparse.getOffsetsPtr();
arg_with_sparse.column = column_sparse.getValuesPtr();
}
size_t values_size = arg_with_sparse.column->size();
for (size_t i = 0; i < columns_without_sparse.size(); ++i)
{
if (i == sparse_column_position)
continue;
columns_without_sparse[i].column = columns_without_sparse[i].column->cloneResized(values_size);
}
auto res = executeWithoutSparseColumns(columns_without_sparse, result_type, input_rows_count, dry_run);
return ColumnSparse::create(res, sparse_offsets, input_rows_count);
}
convertSparseColumnsToFull(columns_without_sparse);
return executeWithoutSparseColumns(columns_without_sparse, result_type, input_rows_count, dry_run);
}
return executeWithoutSparseColumns(arguments, result_type, input_rows_count, dry_run);
}
void FunctionOverloadResolverAdaptor::checkNumberOfArguments(size_t number_of_arguments) const
{
if (isVariadic())

View File

@ -32,6 +32,9 @@ private:
ColumnPtr executeWithoutLowCardinalityColumns(
const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const;
ColumnPtr executeWithoutSparseColumns(
const ColumnsWithTypeAndName & args, const DataTypePtr & result_type, size_t input_rows_count, bool dry_run) const;
};
class FunctionBaseAdaptor final : public IFunctionBase

View File

@ -62,6 +62,8 @@ public:
*/
virtual bool useDefaultImplementationForLowCardinalityColumns() const { return true; }
virtual bool useDefaultImplementationForSparseColumns() const { return true; }
/** Some arguments could remain constant during this implementation.
*/
virtual ColumnNumbers getArgumentsThatAreAlwaysConstant() const { return {}; }

View File

@ -197,7 +197,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
Block block_to_write = block;
for (auto & col : block_to_write)
{
if (serializations[col.name]->getKind() != SerializationKind::SPARSE)
if (serializations[col.name]->getKind() != ISerialization::Kind::SPARSE)
col.column = col.column->convertToFullColumnIfSparse();
}

View File

@ -56,7 +56,8 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const StorageID & table_id_
{"column_bytes_on_disk", std::make_shared<DataTypeUInt64>()},
{"column_data_compressed_bytes", std::make_shared<DataTypeUInt64>()},
{"column_data_uncompressed_bytes", std::make_shared<DataTypeUInt64>()},
{"column_marks_bytes", std::make_shared<DataTypeUInt64>()}
{"column_marks_bytes", std::make_shared<DataTypeUInt64>()},
{"serialization_kind", std::make_shared<DataTypeString>()}
}
)
{
@ -212,6 +213,12 @@ void StorageSystemPartsColumns::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(column_size.marks);
if (columns_mask[src_index++])
{
auto kind = part->getSerializationForColumn(column)->getKind();
columns[res_index++]->insert(ISerialization::kindToString(kind));
}
if (has_state_column)
columns[res_index++]->insert(part->stateString());
}