Merge pull request #1574 from yandex/unify-data-types-that-serialized-with-multiple-streams

Unify data types that serialized with multiple streams
This commit is contained in:
alexey-milovidov 2017-12-01 17:27:04 +03:00 committed by GitHub
commit 9dcdb86ae5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
76 changed files with 854 additions and 1349 deletions

View File

@ -5,6 +5,9 @@
#include <Columns/ColumnsCommon.h> #include <Columns/ColumnsCommon.h>
#include <DataStreams/ColumnGathererStream.h> #include <DataStreams/ColumnGathererStream.h>
/// Used in the `reserve` method, when the number of rows is known, but sizes of elements are not.
#define APPROX_STRING_SIZE 64
namespace DB namespace DB
{ {
@ -257,7 +260,7 @@ void ColumnString::gather(ColumnGathererStream & gatherer)
void ColumnString::reserve(size_t n) void ColumnString::reserve(size_t n)
{ {
offsets.reserve(n); offsets.reserve(n);
chars.reserve(n * DBMS_APPROX_STRING_SIZE); chars.reserve(n * APPROX_STRING_SIZE);
} }

View File

@ -361,6 +361,7 @@ namespace ErrorCodes
extern const int HTTP_LENGTH_REQUIRED = 381; extern const int HTTP_LENGTH_REQUIRED = 381;
extern const int CANNOT_LOAD_CATBOOST_MODEL = 382; extern const int CANNOT_LOAD_CATBOOST_MODEL = 382;
extern const int CANNOT_APPLY_CATBOOST_MODEL = 383; extern const int CANNOT_APPLY_CATBOOST_MODEL = 383;
extern const int MULTIPLE_STREAMS_REQUIRED = 384;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -55,11 +55,10 @@
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue. #define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16 #define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16
/// Used in the `reserve` method, when the number of rows is known, but their dimensions are unknown.
#define DBMS_APPROX_STRING_SIZE 64
/// Name suffix for the column containing the array offsets. /// Name suffix for the column containing the array offsets.
#define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size" #define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size"
/// And NULL map.
#define NULL_MAP_COLUMN_NAME_SUFFIX ".null"
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032 #define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058

View File

@ -48,40 +48,8 @@ NativeBlockInputStream::NativeBlockInputStream(
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint) void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{ {
if (type.isNullable()) IDataType::InputStreamGetter input_stream_getter = [&] (const IDataType::SubstreamPath & path) { return &istr; };
{ type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, avg_value_size_hint, false, {});
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
IColumn & nested_col = *nullable_col.getNestedColumn();
IColumn & null_map = nullable_col.getNullMapConcreteColumn();
DataTypeUInt8{}.deserializeBinaryBulk(null_map, istr, rows, avg_value_size_hint);
readData(nested_type, nested_col, istr, rows, avg_value_size_hint);
return;
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/** For arrays, we deserialize the offsets first, and then the values.
*/
IColumn & offsets_column = *typeid_cast<ColumnArray &>(column).getOffsetsColumn();
type_arr->getOffsetsType()->deserializeBinaryBulk(offsets_column, istr, rows, 0);
if (offsets_column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
if (rows)
readData(
*type_arr->getNestedType(),
typeid_cast<ColumnArray &>(column).getData(),
istr,
typeid_cast<const ColumnArray &>(column).getOffsets()[rows - 1], 0);
}
else
type.deserializeBinaryBulk(column, istr, rows, avg_value_size_hint);
if (column.size() != rows) if (column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);

View File

@ -60,56 +60,8 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr
else else
full_column = column; full_column = column;
if (type.isNullable()) IDataType::OutputStreamGetter output_stream_getter = [&] (const IDataType::SubstreamPath & path) { return &ostr; };
{ type.serializeBinaryBulkWithMultipleStreams(*full_column, output_stream_getter, offset, limit, false, {});
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(*full_column.get());
const ColumnPtr & nested_col = nullable_col.getNestedColumn();
const IColumn & null_map = nullable_col.getNullMapConcreteColumn();
DataTypeUInt8{}.serializeBinaryBulk(null_map, ostr, offset, limit);
writeData(nested_type, nested_col, ostr, offset, limit);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/** For arrays, we serialize the offsets first, and then the values.
*/
const ColumnArray & column_array = typeid_cast<const ColumnArray &>(*full_column);
type_arr->getOffsetsType()->serializeBinaryBulk(*column_array.getOffsetsColumn(), ostr, offset, limit);
if (!column_array.getData().empty())
{
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
if (offset > offsets.size())
return;
/** offset - from which array to write.
* limit - how many arrays should be written, or 0, if you write everything that is.
* end - up to which array written part finishes.
*
* nested_offset - from which nested element to write.
* nested_limit - how many nested elements to write, or 0, if you write everything that is.
*/
size_t end = std::min(offset + limit, offsets.size());
size_t nested_offset = offset ? offsets[offset - 1] : 0;
size_t nested_limit = limit
? offsets[end - 1] - nested_offset
: 0;
const DataTypePtr & nested_type = type_arr->getNestedType();
if (limit == 0 || nested_limit)
writeData(*nested_type, column_array.getDataPtr(), ostr, nested_offset, nested_limit);
}
}
else
type.serializeBinaryBulk(*full_column, ostr, offset, limit);
} }

View File

@ -107,12 +107,87 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr) const
} }
void DataTypeArray::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const namespace
{
void serializeArraySizesPositionIndependent(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit)
{
const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offset_values = column_array.getOffsets();
size_t size = offset_values.size();
if (!size)
return;
size_t end = limit && (offset + limit < size)
? offset + limit
: size;
ColumnArray::Offset_t prev_offset = offset == 0 ? 0 : offset_values[offset - 1];
for (size_t i = offset; i < end; ++i)
{
ColumnArray::Offset_t current_offset = offset_values[i];
writeIntBinary(current_offset - prev_offset, ostr);
prev_offset = current_offset;
}
}
void deserializeArraySizesPositionIndependent(IColumn & column, ReadBuffer & istr, size_t limit)
{
ColumnArray & column_array = typeid_cast<ColumnArray &>(column);
ColumnArray::Offsets_t & offset_values = column_array.getOffsets();
size_t initial_size = offset_values.size();
offset_values.resize(initial_size + limit);
size_t i = initial_size;
ColumnArray::Offset_t current_offset = initial_size ? offset_values[initial_size - 1] : 0;
while (i < initial_size + limit && !istr.eof())
{
ColumnArray::Offset_t current_size = 0;
readIntBinary(current_size, istr);
current_offset += current_size;
offset_values[i] = current_offset;
++i;
}
offset_values.resize(i);
}
}
void DataTypeArray::enumerateStreams(StreamCallback callback, SubstreamPath path) const
{
path.push_back(Substream::ArraySizes);
callback(path);
path.back() = Substream::ArrayElements;
nested->enumerateStreams(callback, path);
}
void DataTypeArray::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
{ {
const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column); const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
if (offset > offsets.size()) /// First serialize array sizes.
path.push_back(Substream::ArraySizes);
if (auto stream = getter(path))
{
if (position_independent_encoding)
serializeArraySizesPositionIndependent(column, *stream, offset, limit);
else
offsets->serializeBinaryBulk(*column_array.getOffsetsColumn(), *stream, offset, limit);
}
/// Then serialize contents of arrays.
path.back() = Substream::ArrayElements;
const ColumnArray::Offsets_t & offset_values = column_array.getOffsets();
if (offset > offset_values.size())
return; return;
/** offset - from which array to write. /** offset - from which array to write.
@ -123,82 +198,56 @@ void DataTypeArray::serializeBinaryBulk(const IColumn & column, WriteBuffer & os
* nested_limit - how many elements of the innards to write, or 0, if you write everything that is. * nested_limit - how many elements of the innards to write, or 0, if you write everything that is.
*/ */
size_t end = std::min(offset + limit, offsets.size()); size_t end = std::min(offset + limit, offset_values.size());
size_t nested_offset = offset ? offsets[offset - 1] : 0; size_t nested_offset = offset ? offset_values[offset - 1] : 0;
size_t nested_limit = limit size_t nested_limit = limit
? offsets[end - 1] - nested_offset ? offset_values[end - 1] - nested_offset
: 0; : 0;
if (limit == 0 || nested_limit) if (limit == 0 || nested_limit)
nested->serializeBinaryBulk(column_array.getData(), ostr, nested_offset, nested_limit); nested->serializeBinaryBulkWithMultipleStreams(column_array.getData(), getter, nested_offset, nested_limit, position_independent_encoding, path);
} }
void DataTypeArray::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double) const void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path) const
{ {
ColumnArray & column_array = typeid_cast<ColumnArray &>(column); ColumnArray & column_array = typeid_cast<ColumnArray &>(column);
ColumnArray::Offsets_t & offsets = column_array.getOffsets();
path.push_back(Substream::ArraySizes);
if (auto stream = getter(path))
{
if (position_independent_encoding)
deserializeArraySizesPositionIndependent(column, *stream, limit);
else
offsets->deserializeBinaryBulk(*column_array.getOffsetsColumn(), *stream, limit, 0);
}
path.back() = Substream::ArrayElements;
ColumnArray::Offsets_t & offset_values = column_array.getOffsets();
IColumn & nested_column = column_array.getData(); IColumn & nested_column = column_array.getData();
/// Number of values corresponding with `offsets` must be read. /// Number of values corresponding with `offset_values` must be read.
size_t last_offset = (offsets.empty() ? 0 : offsets.back()); size_t last_offset = (offset_values.empty() ? 0 : offset_values.back());
if (last_offset < nested_column.size()) if (last_offset < nested_column.size())
throw Exception("Nested column is longer than last offset", ErrorCodes::LOGICAL_ERROR); throw Exception("Nested column is longer than last offset", ErrorCodes::LOGICAL_ERROR);
size_t nested_limit = last_offset - nested_column.size(); size_t nested_limit = last_offset - nested_column.size();
nested->deserializeBinaryBulk(nested_column, istr, nested_limit, 0); nested->deserializeBinaryBulkWithMultipleStreams(nested_column, getter, nested_limit, 0, position_independent_encoding, path);
if (column_array.getData().size() != last_offset) /// Check consistency between offsets and elements subcolumns.
/// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER.
if (!nested_column.empty() && nested_column.size() != last_offset)
throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA); throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA);
} }
void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
size_t size = offsets.size();
if (!size)
return;
size_t end = limit && (offset + limit < size)
? offset + limit
: size;
if (offset == 0)
{
writeIntBinary(offsets[0], ostr);
++offset;
}
for (size_t i = offset; i < end; ++i)
writeIntBinary(offsets[i] - offsets[i - 1], ostr);
}
void DataTypeArray::deserializeOffsets(IColumn & column, ReadBuffer & istr, size_t limit) const
{
ColumnArray & column_array = typeid_cast<ColumnArray &>(column);
ColumnArray::Offsets_t & offsets = column_array.getOffsets();
size_t initial_size = offsets.size();
offsets.resize(initial_size + limit);
size_t i = initial_size;
ColumnArray::Offset_t current_offset = initial_size ? offsets[initial_size - 1] : 0;
while (i < initial_size + limit && !istr.eof())
{
ColumnArray::Offset_t current_size = 0;
readIntBinary(current_size, istr);
current_offset += current_size;
offsets[i] = current_offset;
++i;
}
offsets.resize(i);
}
template <typename Writer> template <typename Writer>
static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, Writer && write_nested) static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, Writer && write_nested)
{ {

View File

@ -68,24 +68,27 @@ public:
/** Streaming serialization of arrays is arranged in a special way: /** Streaming serialization of arrays is arranged in a special way:
* - elements placed in a row are written/read without array sizes; * - elements placed in a row are written/read without array sizes;
* - the sizes are written/read in a separate column, * - the sizes are written/read in a separate stream,
* and the caller must take care of writing/reading the sizes.
* This is necessary, because when implementing nested structures, several arrays can have common sizes. * This is necessary, because when implementing nested structures, several arrays can have common sizes.
*/ */
/** Write only values, without dimensions. The caller also needs to record the offsets somewhere. */ void enumerateStreams(StreamCallback callback, SubstreamPath path) const override;
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
/** Read only values, without dimensions. void serializeBinaryBulkWithMultipleStreams(
* In this case, all the sizes must already be read in the column beforehand. const IColumn & column,
*/ OutputStreamGetter getter,
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const override;
/** Write the dimensions. */ void deserializeBinaryBulkWithMultipleStreams(
void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const; IColumn & column,
InputStreamGetter getter,
/** Read the dimensions. Call this method before reading the values. */ size_t limit,
void deserializeOffsets(IColumn & column, ReadBuffer & istr, size_t limit) const; double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path) const override;
ColumnPtr createColumn() const override; ColumnPtr createColumn() const override;

View File

@ -1,4 +1,5 @@
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
@ -28,17 +29,54 @@ DataTypeNullable::DataTypeNullable(const DataTypePtr & nested_data_type_)
throw Exception("Nested type " + nested_data_type->getName() + " cannot be inside Nullable type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception("Nested type " + nested_data_type->getName() + " cannot be inside Nullable type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
void DataTypeNullable::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
void DataTypeNullable::enumerateStreams(StreamCallback callback, SubstreamPath path) const
{
path.push_back(Substream::NullMap);
callback(path);
path.back() = Substream::NullableElements;
nested_data_type->enumerateStreams(callback, path);
}
void DataTypeNullable::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
{ {
const ColumnNullable & col = static_cast<const ColumnNullable &>(column); const ColumnNullable & col = static_cast<const ColumnNullable &>(column);
col.checkConsistency(); col.checkConsistency();
nested_data_type->serializeBinaryBulk(*col.getNestedColumn(), ostr, offset, limit);
/// First serialize null map.
path.push_back(Substream::NullMap);
if (auto stream = getter(path))
DataTypeUInt8().serializeBinaryBulk(col.getNullMapConcreteColumn(), *stream, offset, limit);
/// Then serialize contents of arrays.
path.back() = Substream::NullableElements;
nested_data_type->serializeBinaryBulkWithMultipleStreams(*col.getNestedColumn(), getter, offset, limit, position_independent_encoding, path);
} }
void DataTypeNullable::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
void DataTypeNullable::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path) const
{ {
ColumnNullable & col = static_cast<ColumnNullable &>(column); ColumnNullable & col = static_cast<ColumnNullable &>(column);
nested_data_type->deserializeBinaryBulk(*col.getNestedColumn(), istr, limit, avg_value_size_hint);
path.push_back(Substream::NullMap);
if (auto stream = getter(path))
DataTypeUInt8().deserializeBinaryBulk(col.getNullMapConcreteColumn(), *stream, limit, 0);
path.back() = Substream::NullableElements;
nested_data_type->deserializeBinaryBulkWithMultipleStreams(*col.getNestedColumn(), getter, limit, avg_value_size_hint, position_independent_encoding, path);
} }

View File

@ -25,9 +25,23 @@ public:
DataTypePtr clone() const override { return std::make_shared<DataTypeNullable>(nested_data_type->clone()); } DataTypePtr clone() const override { return std::make_shared<DataTypeNullable>(nested_data_type->clone()); }
/// Bulk serialization and deserialization is processing only nested columns. You should process null byte map separately. void enumerateStreams(StreamCallback callback, SubstreamPath path) const override;
void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override;
void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const override;
void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path) const override;
void serializeBinary(const Field & field, WriteBuffer & ostr) const override { nested_data_type->serializeBinary(field, ostr); } void serializeBinary(const Field & field, WriteBuffer & ostr) const override { nested_data_type->serializeBinary(field, ostr); }
void deserializeBinary(Field & field, ReadBuffer & istr) const override { nested_data_type->deserializeBinary(field, istr); } void deserializeBinary(Field & field, ReadBuffer & istr) const override { nested_data_type->deserializeBinary(field, istr); }

View File

@ -1,12 +1,26 @@
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
#include <Common/Exception.h>
#include <Common/escapeForFileName.h>
#include <Core/Defines.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/IDataType.h> #include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeNested.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int MULTIPLE_STREAMS_REQUIRED;
}
void IDataType::updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint) void IDataType::updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint)
{ {
/// Update the average value size hint if amount of read rows isn't too small /// Update the average value size hint if amount of read rows isn't too small
@ -30,6 +44,39 @@ ColumnPtr IDataType::createConstColumn(size_t size, const Field & field) const
return std::make_shared<ColumnConst>(column, size); return std::make_shared<ColumnConst>(column, size);
} }
void IDataType::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
throw Exception("Data type " + getName() + " must be serialized with multiple streams", ErrorCodes::MULTIPLE_STREAMS_REQUIRED);
}
void IDataType::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
throw Exception("Data type " + getName() + " must be deserialized with multiple streams", ErrorCodes::MULTIPLE_STREAMS_REQUIRED);
}
String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = DataTypeNested::extractNestedTableName(column_name);
bool is_sizes_of_nested_type = !path.empty() && path.back().type == IDataType::Substream::ArraySizes
&& nested_table_name != column_name;
size_t array_level = 0;
String stream_name = escapeForFileName(is_sizes_of_nested_type ? nested_table_name : column_name);
for (const IDataType::Substream & elem : path)
{
if (elem.type == IDataType::Substream::NullMap)
stream_name += NULL_MAP_COLUMN_NAME_SUFFIX;
else if (elem.type == IDataType::Substream::ArraySizes)
stream_name = DataTypeNested::extractNestedTableName(stream_name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(array_level);
else if (elem.type == IDataType::Substream::ArrayElements)
++array_level;
}
return stream_name;
}
void IDataType::insertDefaultInto(IColumn & column) const void IDataType::insertDefaultInto(IColumn & column) const
{ {
column.insertDefault(); column.insertDefault();

View File

@ -66,18 +66,91 @@ public:
virtual DataTypePtr clone() const = 0; virtual DataTypePtr clone() const = 0;
/** Binary serialization for range of values in column - for writing to disk/network, etc. /** Binary serialization for range of values in column - for writing to disk/network, etc.
* 'offset' and 'limit' are used to specify range. *
* Some data types are represented in multiple streams while being serialized.
* Example:
* - Arrays are represented as stream of all elements and stream of array sizes.
* - Nullable types are represented as stream of values (with unspecified values in place of NULLs) and stream of NULL flags.
*
* Different streams are identified by "path".
* If the data type require single stream (it's true for most of data types), the stream will have empty path.
* Otherwise, the path can have components like "array elements", "array sizes", etc.
*
* For multidimensional arrays, path can have arbiraty length.
* As an example, for 2-dimensional arrays of numbers we have at least three streams:
* - array sizes; (sizes of top level arrays)
* - array elements / array sizes; (sizes of second level (nested) arrays)
* - array elements / array elements; (the most deep elements, placed contiguously)
*
* Descendants must override either serializeBinaryBulk, deserializeBinaryBulk methods (for simple cases with single stream)
* or serializeBinaryBulkWithMultipleStreams, deserializeBinaryBulkWithMultipleStreams, enumerateStreams methods (for cases with multiple streams).
*
* Default implementations of ...WithMultipleStreams methods will call serializeBinaryBulk, deserializeBinaryBulk for single stream.
*/
struct Substream
{
enum Type
{
ArrayElements,
ArraySizes,
NullableElements,
NullMap,
};
Type type;
Substream(Type type) : type(type) {}
};
using SubstreamPath = std::vector<Substream>;
using StreamCallback = std::function<void(const SubstreamPath &)>;
virtual void enumerateStreams(StreamCallback callback, SubstreamPath path) const
{
callback(path);
}
using OutputStreamGetter = std::function<WriteBuffer*(const SubstreamPath &)>;
using InputStreamGetter = std::function<ReadBuffer*(const SubstreamPath &)>;
/** 'offset' and 'limit' are used to specify range.
* limit = 0 - means no limit. * limit = 0 - means no limit.
* offset must be not greater than size of column. * offset must be not greater than size of column.
* offset + limit could be greater than size of column * offset + limit could be greater than size of column
* - in that case, column is serialized to the end. * - in that case, column is serialized till the end.
*/ */
virtual void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const = 0; virtual void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
OutputStreamGetter getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath path) const
{
if (WriteBuffer * stream = getter(path))
serializeBinaryBulk(column, *stream, offset, limit);
}
/** Read no more than limit values and append them into column. /** Read no more than limit values and append them into column.
* avg_value_size_hint - if not zero, may be used to avoid reallocations while reading column of String type. * avg_value_size_hint - if not zero, may be used to avoid reallocations while reading column of String type.
*/ */
virtual void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const = 0; virtual void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
InputStreamGetter getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath path) const
{
if (ReadBuffer * stream = getter(path))
deserializeBinaryBulk(column, *stream, limit, avg_value_size_hint);
}
/** Override these methods for data types that require just single stream (most of data types).
*/
virtual void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const;
virtual void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
/** Serialization/deserialization of individual values. /** Serialization/deserialization of individual values.
* *
@ -176,6 +249,8 @@ public:
/// Updates avg_value_size_hint for newly read column. Uses to optimize deserialization. Zero expected for first column. /// Updates avg_value_size_hint for newly read column. Uses to optimize deserialization. Zero expected for first column.
static void updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint); static void updateAvgValueSizeHint(const IColumn & column, double & avg_value_size_hint);
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);
}; };

View File

@ -38,7 +38,7 @@ try
WriteBufferFromFile out_buf("test"); WriteBufferFromFile out_buf("test");
stopwatch.restart(); stopwatch.restart();
data_type.serializeBinaryBulk(*column, out_buf, 0, 0); data_type.serializeBinaryBulkWithMultipleStreams(*column, [&](const IDataType::SubstreamPath &){ return &out_buf; }, 0, 0, true, {});
stopwatch.stop(); stopwatch.stop();
std::cout << "Writing, elapsed: " << stopwatch.elapsedSeconds() << std::endl; std::cout << "Writing, elapsed: " << stopwatch.elapsedSeconds() << std::endl;
@ -50,7 +50,7 @@ try
ReadBufferFromFile in_buf("test"); ReadBufferFromFile in_buf("test");
stopwatch.restart(); stopwatch.restart();
data_type.deserializeBinaryBulk(*column, in_buf, n, 0); data_type.deserializeBinaryBulkWithMultipleStreams(*column, [&](const IDataType::SubstreamPath &){ return &in_buf; }, n, 0, true, {});
stopwatch.stop(); stopwatch.stop();
std::cout << "Reading, elapsed: " << stopwatch.elapsedSeconds() << std::endl; std::cout << "Reading, elapsed: " << stopwatch.elapsedSeconds() << std::endl;

View File

@ -27,7 +27,7 @@ int main(int argc, char ** argv)
WriteBufferFromOStream out_buf(ostr); WriteBufferFromOStream out_buf(ostr);
stopwatch.restart(); stopwatch.restart();
data_type.serializeBinaryBulk(*column, out_buf, 0, 0); data_type.serializeBinaryBulkWithMultipleStreams(*column, [&](const IDataType::SubstreamPath &){ return &out_buf; }, 0, 0, true, {});
stopwatch.stop(); stopwatch.stop();
std::cout << "Elapsed: " << stopwatch.elapsedSeconds() << std::endl; std::cout << "Elapsed: " << stopwatch.elapsedSeconds() << std::endl;

View File

@ -9,7 +9,7 @@ namespace DB
/** Base class for ReadBuffer and WriteBuffer. /** Base class for ReadBuffer and WriteBuffer.
* Contains mutual types, variables, and functions. * Contains common types, variables, and functions.
* *
* ReadBuffer and WriteBuffer are similar to istream and ostream, respectively. * ReadBuffer and WriteBuffer are similar to istream and ostream, respectively.
* They have to be used, because using iostreams it is impossible to effectively implement some operations. * They have to be used, because using iostreams it is impossible to effectively implement some operations.

View File

@ -108,12 +108,12 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration); rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
} }
size_t unread_rows_in_current_granule = reader.unreadRowsInCurrentGranule(); size_t unread_rows_in_current_granule = reader.numPendingRowsInCurrentGranule();
if (unread_rows_in_current_granule >= rows_to_read) if (unread_rows_in_current_granule >= rows_to_read)
return rows_to_read; return rows_to_read;
size_t granule_to_read = (rows_to_read + reader.readRowsInCurrentGranule() + index_granularity / 2) / index_granularity; size_t granule_to_read = (rows_to_read + reader.numReadRowsInCurrentGranule() + index_granularity / 2) / index_granularity;
return index_granularity * granule_to_read - reader.readRowsInCurrentGranule(); return index_granularity * granule_to_read - reader.numReadRowsInCurrentGranule();
}; };
// read rows from reader and clear columns // read rows from reader and clear columns
@ -196,7 +196,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!pre_range_reader) if (!pre_range_reader)
processNextRange(*task, *pre_reader); processNextRange(*task, *pre_reader);
size_t rows_to_read = std::min(pre_range_reader->unreadRows(), space_left); size_t rows_to_read = std::min(pre_range_reader->numPendingRows(), space_left);
size_t read_rows = pre_range_reader->read(res, rows_to_read); size_t read_rows = pre_range_reader->read(res, rows_to_read);
rows_was_read_in_last_range += read_rows; rows_was_read_in_last_range += read_rows;
if (pre_range_reader->isReadingFinished()) if (pre_range_reader->isReadingFinished())
@ -263,7 +263,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (task->number_of_rows_to_skip) if (task->number_of_rows_to_skip)
skipRows(res, *task->current_range_reader, *task, task->number_of_rows_to_skip); skipRows(res, *task->current_range_reader, *task, task->number_of_rows_to_skip);
size_t rows_to_read = ranges_to_read.empty() size_t rows_to_read = ranges_to_read.empty()
? rows_was_read_in_last_range : task->current_range_reader->unreadRows(); ? rows_was_read_in_last_range : task->current_range_reader->numPendingRows();
task->current_range_reader->read(res, rows_to_read); task->current_range_reader->read(res, rows_to_read);
} }
@ -272,7 +272,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
const auto & range = ranges_to_read[range_idx]; const auto & range = ranges_to_read[range_idx];
task->current_range_reader = reader->readRange(range.begin, range.end); task->current_range_reader = reader->readRange(range.begin, range.end);
size_t rows_to_read = range_idx + 1 == ranges_to_read.size() size_t rows_to_read = range_idx + 1 == ranges_to_read.size()
? rows_was_read_in_last_range : task->current_range_reader->unreadRows(); ? rows_was_read_in_last_range : task->current_range_reader->numPendingRows();
task->current_range_reader->read(res, rows_to_read); task->current_range_reader->read(res, rows_to_read);
} }
@ -310,7 +310,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Now we need to read the same number of rows as in prewhere. /// Now we need to read the same number of rows as in prewhere.
size_t rows_to_read = next_range_idx == ranges_to_read.size() size_t rows_to_read = next_range_idx == ranges_to_read.size()
? rows_was_read_in_last_range : (task->current_range_reader->unreadRows() - number_of_rows_to_skip); ? rows_was_read_in_last_range : (task->current_range_reader->numPendingRows() - number_of_rows_to_skip);
auto readRows = [&]() auto readRows = [&]()
{ {
@ -338,7 +338,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
{ {
auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos; auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos;
auto range_reader_with_skipped_rows = range_reader.getFutureState(number_of_rows_to_skip + rows_should_be_copied); auto range_reader_with_skipped_rows = range_reader.getFutureState(number_of_rows_to_skip + rows_should_be_copied);
auto unread_rows_in_current_granule = range_reader_with_skipped_rows.unreadRowsInCurrentGranule(); auto unread_rows_in_current_granule = range_reader_with_skipped_rows.numPendingRowsInCurrentGranule();
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule); const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule);
bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos; bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos;
@ -424,8 +424,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
else else
throw Exception{ throw Exception{
"Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", "Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER};
};
if (res) if (res)
{ {

View File

@ -1,4 +1,5 @@
#pragma once #pragma once
#include <DataStreams/IProfilingBlockInputStream.h> #include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h> #include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>

View File

@ -546,7 +546,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
res = spreadMarkRangesAmongStreamsFinal( res = spreadMarkRangesAmongStreamsFinal(
parts_with_ranges, std::move(parts_with_ranges),
column_names_to_read, column_names_to_read,
max_block_size, max_block_size,
settings.use_uncompressed_cache, settings.use_uncompressed_cache,
@ -559,7 +559,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
else else
{ {
res = spreadMarkRangesAmongStreams( res = spreadMarkRangesAmongStreams(
parts_with_ranges, std::move(parts_with_ranges),
num_streams, num_streams,
column_names_to_read, column_names_to_read,
max_block_size, max_block_size,
@ -585,7 +585,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts parts, RangesInDataParts && parts,
size_t num_streams, size_t num_streams,
const Names & column_names, const Names & column_names,
size_t max_block_size, size_t max_block_size,
@ -605,7 +605,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
size_t sum_marks = 0; size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i) for (size_t i = 0; i < parts.size(); ++i)
{ {
/// Let the segments be listed from right to left so that the leftmost segment can be dropped using `pop_back()`. /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
for (const auto & range : parts[i].ranges) for (const auto & range : parts[i].ranges)
@ -655,10 +655,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
{ {
size_t need_marks = min_marks_per_stream; size_t need_marks = min_marks_per_stream;
/// Loop parts. /// Loop over parts.
/// We will iteratively take part or some subrange of a part from the back
/// and assign a stream to read from it.
while (need_marks > 0 && !parts.empty()) while (need_marks > 0 && !parts.empty())
{ {
RangesInDataPart & part = parts.back(); RangesInDataPart part = parts.back();
size_t & marks_in_part = sum_marks_in_parts.back(); size_t & marks_in_part = sum_marks_in_parts.back();
/// We will not take too few rows from a part. /// We will not take too few rows from a part.
@ -687,7 +689,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
} }
else else
{ {
/// Cycle through segments of a part. /// Loop through ranges in part. Take enough ranges to cover "need_marks".
while (need_marks > 0) while (need_marks > 0)
{ {
if (part.ranges.empty()) if (part.ranges.empty())
@ -725,7 +727,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
} }
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts parts, RangesInDataParts && parts,
const Names & column_names, const Names & column_names,
size_t max_block_size, size_t max_block_size,
bool use_uncompressed_cache, bool use_uncompressed_cache,

View File

@ -36,7 +36,7 @@ private:
Logger * log; Logger * log;
BlockInputStreams spreadMarkRangesAmongStreams( BlockInputStreams spreadMarkRangesAmongStreams(
RangesInDataParts parts, RangesInDataParts && parts,
size_t num_streams, size_t num_streams,
const Names & column_names, const Names & column_names,
size_t max_block_size, size_t max_block_size,
@ -47,7 +47,7 @@ private:
const Settings & settings) const; const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsFinal( BlockInputStreams spreadMarkRangesAmongStreamsFinal(
RangesInDataParts parts, RangesInDataParts && parts,
const Names & column_names, const Names & column_names,
size_t max_block_size, size_t max_block_size,
bool use_uncompressed_cache, bool use_uncompressed_cache,

View File

@ -74,7 +74,6 @@ public:
size_t read(size_t rows) size_t read(size_t rows)
{ {
ColumnPtr column = type->createColumn(); ColumnPtr column = type->createColumn();
type->deserializeBinaryBulk(*column, uncompressed_hashing_buf, rows, 0);
return column->size(); return column->size();
} }

View File

@ -6,13 +6,13 @@ namespace DB
MergeTreeRangeReader::MergeTreeRangeReader( MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity) MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity)
: merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark) : merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark)
, read_rows_after_current_mark(0), index_granularity(index_granularity), continue_reading(false), is_reading_finished(false) , index_granularity(index_granularity)
{ {
} }
size_t MergeTreeRangeReader::skipToNextMark() size_t MergeTreeRangeReader::skipToNextMark()
{ {
auto unread_rows_in_current_part = unreadRowsInCurrentGranule(); auto unread_rows_in_current_part = numPendingRowsInCurrentGranule();
continue_reading = false; continue_reading = false;
++current_mark; ++current_mark;
if (current_mark == last_mark) if (current_mark == last_mark)
@ -33,10 +33,10 @@ MergeTreeRangeReader MergeTreeRangeReader::getFutureState(size_t rows_to_read) c
size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read) size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
{ {
size_t rows_to_read = unreadRows(); size_t rows_to_read = numPendingRows();
rows_to_read = std::min(rows_to_read, max_rows_to_read); rows_to_read = std::min(rows_to_read, max_rows_to_read);
if (rows_to_read == 0) if (rows_to_read == 0)
return 0; throw Exception("Logical error: 0 rows to read.", ErrorCodes::LOGICAL_ERROR);
auto read_rows = merge_tree_reader.get().readRows(current_mark, continue_reading, rows_to_read, res); auto read_rows = merge_tree_reader.get().readRows(current_mark, continue_reading, rows_to_read, res);

View File

@ -13,10 +13,10 @@ class MergeTreeReader;
class MergeTreeRangeReader class MergeTreeRangeReader
{ {
public: public:
size_t unreadRows() const { return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark; } size_t numPendingRows() const { return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark; }
size_t unreadRowsInCurrentGranule() const { return index_granularity - read_rows_after_current_mark; } size_t numPendingRowsInCurrentGranule() const { return index_granularity - read_rows_after_current_mark; }
size_t readRowsInCurrentGranule() const { return read_rows_after_current_mark; } size_t numReadRowsInCurrentGranule() const { return read_rows_after_current_mark; }
/// Seek to next mark before next reading. /// Seek to next mark before next reading.
size_t skipToNextMark(); size_t skipToNextMark();
@ -41,10 +41,10 @@ private:
std::reference_wrapper<MergeTreeReader> merge_tree_reader; std::reference_wrapper<MergeTreeReader> merge_tree_reader;
size_t current_mark; size_t current_mark;
size_t last_mark; size_t last_mark;
size_t read_rows_after_current_mark; size_t read_rows_after_current_mark = 0;
size_t index_granularity; size_t index_granularity;
bool continue_reading; bool continue_reading = false;
bool is_reading_finished; bool is_reading_finished = false;
friend class MergeTreeReader; friend class MergeTreeReader;
}; };

View File

@ -22,12 +22,6 @@ namespace
using OffsetColumns = std::map<std::string, ColumnPtr>; using OffsetColumns = std::map<std::string, ColumnPtr>;
constexpr auto DATA_FILE_EXTENSION = ".bin"; constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto NULL_MAP_EXTENSION = ".null.bin";
bool isNullStream(const std::string & extension)
{
return extension == NULL_MAP_EXTENSION;
}
} }
namespace ErrorCodes namespace ErrorCodes
@ -58,7 +52,7 @@ MergeTreeReader::MergeTreeReader(const String & path,
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART); throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
for (const NameAndTypePair & column : columns) for (const NameAndTypePair & column : columns)
addStream(column.name, *column.type, all_mark_ranges, profile_callback, clock_type); addStreams(column.name, *column.type, all_mark_ranges, profile_callback, clock_type);
} }
catch (...) catch (...)
{ {
@ -93,9 +87,6 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
for (const NameAndTypePair & it : columns) for (const NameAndTypePair & it : columns)
{ {
if (streams.end() == streams.find(it.name))
continue;
/// The column is already present in the block so we will append the values to the end. /// The column is already present in the block so we will append the values to the end.
bool append = res.has(it.name); bool append = res.has(it.name);
@ -145,8 +136,13 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
try try
{ {
size_t column_size_before_reading = column.column->size(); size_t column_size_before_reading = column.column->size();
readData(column.name, *column.type, *column.column, from_mark, continue_reading, max_rows_to_read, 0, read_offsets);
read_rows = std::max(read_rows, column.column->size() - column_size_before_reading); readData(column.name, *column.type, *column.column, from_mark, continue_reading, max_rows_to_read, read_offsets);
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
if (column.column->size())
read_rows = std::max(read_rows, column.column->size() - column_size_before_reading);
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -272,12 +268,6 @@ MergeTreeReader::Stream::Stream(
} }
} }
std::unique_ptr<MergeTreeReader::Stream> MergeTreeReader::Stream::createEmptyPtr()
{
std::unique_ptr<Stream> res(new Stream);
res->is_empty = true;
return res;
}
const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index) const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index)
{ {
@ -286,14 +276,10 @@ const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index)
return (*marks)[index]; return (*marks)[index];
} }
void MergeTreeReader::Stream::loadMarks() void MergeTreeReader::Stream::loadMarks()
{ {
std::string path; std::string path = path_prefix + ".mrk";
if (isNullStream(extension))
path = path_prefix + ".null.mrk";
else
path = path_prefix + ".mrk";
auto load = [&]() -> MarkCache::MappedPtr auto load = [&]() -> MarkCache::MappedPtr
{ {
@ -304,8 +290,8 @@ void MergeTreeReader::Stream::loadMarks()
size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count; size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count;
if (expected_file_size != file_size) if (expected_file_size != file_size)
throw Exception( throw Exception(
"bad size of marks file `" + path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), "bad size of marks file `" + path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::CORRUPTED_DATA); ErrorCodes::CORRUPTED_DATA);
auto res = std::make_shared<MarksInCompressedFile>(marks_count); auto res = std::make_shared<MarksInCompressedFile>(marks_count);
@ -365,167 +351,94 @@ void MergeTreeReader::Stream::seekToMark(size_t index)
} }
void MergeTreeReader::addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, void MergeTreeReader::addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
size_t level)
{ {
String escaped_column_name = escapeForFileName(name); IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type);
bool data_file_exists = Poco::File(path + escaped_column_name + DATA_FILE_EXTENSION).exists();
bool is_column_of_nested_type = type_arr && level == 0 && DataTypeNested::extractNestedTableName(name) != name;
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
* But we should try to load offset data for array columns of Nested subtable (their data will be filled by default value).
*/
if (!data_file_exists && !is_column_of_nested_type)
return;
if (type.isNullable())
{ {
/// First create the stream that handles the null map of the given column. String stream_name = IDataType::getFileNameForStream(name, substream_path);
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
std::string filename = name + NULL_MAP_EXTENSION; if (streams.count(stream_name))
streams.emplace(filename, std::make_unique<Stream>(
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
/// Then create the stream that handles the data of the given column.
addStream(name, nested_type, all_mark_ranges, profile_callback, clock_type, level);
}
/// For arrays separate streams for sizes are used.
else if (type_arr)
{
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_path = path + escaped_size_name + DATA_FILE_EXTENSION;
/// We have neither offsets nor data -> skipping, default values will be filled after
if (!data_file_exists && !Poco::File(size_path).exists())
return; return;
if (!streams.count(size_name)) bool data_file_exists = Poco::File(path + stream_name + DATA_FILE_EXTENSION).exists();
streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
if (data_file_exists) /** If data file is missing then we will not try to open it.
addStream(name, *type_arr->getNestedType(), all_mark_ranges, profile_callback, clock_type, level + 1); * It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
else */
streams.emplace(name, Stream::createEmptyPtr()); if (!data_file_exists)
} return;
else
streams.emplace(name, std::make_unique<Stream>( streams.emplace(stream_name, std::make_unique<Stream>(
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->marks_count, path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache, all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
};
type.enumerateStreams(callback, {});
} }
void MergeTreeReader::readData( void MergeTreeReader::readData(
const String & name, const IDataType & type, IColumn & column, const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read, size_t from_mark, bool continue_reading, size_t max_rows_to_read,
size_t level, bool read_offsets) bool with_offsets)
{ {
if (type.isNullable()) IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{ {
/// First read from the null map. /// If offsets for arrays have already been read.
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); if (!with_offsets && !path.empty() && path.back().type == IDataType::Substream::ArraySizes)
const IDataType & nested_type = *nullable_type.getNestedType(); return nullptr;
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column); String stream_name = IDataType::getFileNameForStream(name, path);
IColumn & nested_col = *nullable_col.getNestedColumn();
std::string filename = name + NULL_MAP_EXTENSION; auto it = streams.find(stream_name);
if (it == streams.end())
return nullptr;
Stream & stream = *it->second;
Stream & stream = *(streams.at(filename));
if (!continue_reading) if (!continue_reading)
stream.seekToMark(from_mark); stream.seekToMark(from_mark);
IColumn & col8 = nullable_col.getNullMapConcreteColumn();
DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0);
/// Then read data. return stream.data_buffer;
readData(name, nested_type, nested_col, from_mark, continue_reading, max_rows_to_read, level, read_offsets); };
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays the sizes must be deserialized first, then the values.
if (read_offsets)
{
Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
if (!continue_reading)
stream.seekToMark(from_mark);
type_arr->deserializeOffsets(
column,
*stream.data_buffer,
max_rows_to_read);
}
ColumnArray & array = typeid_cast<ColumnArray &>(column); double & avg_value_size_hint = avg_value_size_hints[name];
const size_t required_internal_size = array.getOffsets().size() ? array.getOffsets()[array.getOffsets().size() - 1] : 0; type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, max_rows_to_read, avg_value_size_hint, true, {});
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
}
readData(
name,
*type_arr->getNestedType(),
array.getData(),
from_mark, continue_reading, required_internal_size - array.getData().size(),
level + 1);
size_t read_internal_size = array.getData().size(); static bool arrayHasNoElementsRead(const IColumn & column)
{
const ColumnArray * column_array = typeid_cast<const ColumnArray *>(&column);
/// Fix for erroneously written empty files with array data. if (!column_array)
/// This can happen after ALTER that adds new columns to nested data structures. return false;
if (required_internal_size != read_internal_size)
{
if (read_internal_size != 0)
LOG_ERROR(&Logger::get("MergeTreeReader"),
"Internal size of array " + name + " doesn't match offsets: corrupted data, filling with default values.");
array.getDataPtr() = type_arr->getNestedType()->createConstColumn( size_t size = column_array->size();
required_internal_size, if (!size)
type_arr->getNestedType()->getDefault())->convertToFullColumnIfConst(); return false;
/// NOTE: we could zero this column so that it won't get added to the block size_t data_size = column_array->getData().size();
/// and later be recreated with more correct default values (from the table definition). if (data_size)
} return false;
}
else
{
Stream & stream = *streams[name];
/// It means that data column of array column will be empty, and it will be replaced by const data column size_t last_offset = column_array->getOffsets()[size - 1];
if (stream.isEmpty()) return last_offset != 0;
return;
double & avg_value_size_hint = avg_value_size_hints[name];
if (!continue_reading)
stream.seekToMark(from_mark);
type.deserializeBinaryBulk(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
}
} }
void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_names, bool always_reorder) void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_names, bool always_reorder)
{ {
if (!res) if (!res)
throw Exception("Empty block passed to fillMissingColumnsImpl", ErrorCodes::LOGICAL_ERROR); throw Exception("Empty block passed to fillMissingColumns", ErrorCodes::LOGICAL_ERROR);
try try
{ {
/// For a missing column of a nested data structure we must create not a column of empty /// For a missing column of a nested data structure we must create not a column of empty
/// arrays, but a column of arrays of correct length. /// arrays, but a column of arrays of correct length.
/// TODO: If for some nested data structure only missing columns were selected, the arrays in these columns will be empty,
/// even if the offsets for this nested structure are present in the current part. This can be fixed.
/// NOTE: Similar, but slightly different code is present in Block::addDefaults. /// NOTE: Similar, but slightly different code is present in Block::addDefaults.
/// First, collect offset columns for all arrays in the block. /// First, collect offset columns for all arrays in the block.
@ -534,23 +447,9 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name
{ {
const ColumnWithTypeAndName & column = res.safeGetByPosition(i); const ColumnWithTypeAndName & column = res.safeGetByPosition(i);
IColumn * observed_column; if (const ColumnArray * array = typeid_cast<const ColumnArray *>(column.column.get()))
std::string column_name;
if (column.column->isNullable())
{ {
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(*(column.column)); String offsets_name = DataTypeNested::extractNestedTableName(column.name);
observed_column = nullable_col.getNestedColumn().get();
column_name = observed_column->getName();
}
else
{
observed_column = column.column.get();
column_name = column.name;
}
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(observed_column))
{
String offsets_name = DataTypeNested::extractNestedTableName(column_name);
auto & offsets_column = offset_columns[offsets_name]; auto & offsets_column = offset_columns[offsets_name];
/// If for some reason multiple offsets columns are present for the same nested data structure, /// If for some reason multiple offsets columns are present for the same nested data structure,
@ -560,13 +459,26 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name
} }
} }
auto should_evaluate_defaults = false; bool should_evaluate_defaults = false;
auto should_sort = always_reorder; bool should_sort = always_reorder;
size_t rows = res.rows();
/// insert default values only for columns without default expressions
for (const auto & requested_column : columns) for (const auto & requested_column : columns)
{ {
/// insert default values only for columns without default expressions bool has_column = res.has(requested_column.name);
if (!res.has(requested_column.name)) if (has_column)
{
const auto & col = *res.getByName(requested_column.name).column;
if (arrayHasNoElementsRead(col))
{
res.erase(requested_column.name);
has_column = false;
}
}
if (!has_column)
{ {
should_sort = true; should_sort = true;
if (storage.column_defaults.count(requested_column.name) != 0) if (storage.column_defaults.count(requested_column.name) != 0)
@ -597,7 +509,7 @@ void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_name
/// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere /// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere
/// but in some blocks (from other parts) it can be a full column. /// but in some blocks (from other parts) it can be a full column.
column_to_add.column = column_to_add.type->createConstColumn( column_to_add.column = column_to_add.type->createConstColumn(
res.rows(), column_to_add.type->getDefault())->convertToFullColumnIfConst(); rows, column_to_add.type->getDefault())->convertToFullColumnIfConst();
} }
res.insert(std::move(column_to_add)); res.insert(std::move(column_to_add));

View File

@ -58,12 +58,8 @@ private:
size_t aio_threshold, size_t max_read_buffer_size, size_t aio_threshold, size_t max_read_buffer_size,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type); const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
static std::unique_ptr<Stream> createEmptyPtr();
void seekToMark(size_t index); void seekToMark(size_t index);
bool isEmpty() const { return is_empty; }
ReadBuffer * data_buffer; ReadBuffer * data_buffer;
private: private:
@ -85,8 +81,6 @@ private:
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer; std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer; std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
bool is_empty = false;
}; };
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
@ -111,14 +105,13 @@ private:
size_t aio_threshold; size_t aio_threshold;
size_t max_read_buffer_size; size_t max_read_buffer_size;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, void addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
size_t level = 0);
void readData( void readData(
const String & name, const IDataType & type, IColumn & column, const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read, size_t from_mark, bool continue_reading, size_t max_rows_to_read,
size_t level = 0, bool read_offsets = true); bool read_offsets = true);
/// Return the number of rows has been read or zero if there is no columns to read. /// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark

View File

@ -20,8 +20,6 @@ namespace
constexpr auto DATA_FILE_EXTENSION = ".bin"; constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto MARKS_FILE_EXTENSION = ".mrk"; constexpr auto MARKS_FILE_EXTENSION = ".mrk";
constexpr auto NULL_MAP_EXTENSION = ".null.bin";
constexpr auto NULL_MARKS_FILE_EXTENSION = ".null.mrk";
} }
@ -42,189 +40,46 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
} }
void IMergedBlockOutputStream::addStream( void IMergedBlockOutputStream::addStreams(
const String & path, const String & path,
const String & name, const String & name,
const IDataType & type, const IDataType & type,
size_t estimated_size, size_t estimated_size,
size_t level,
const String & filename,
bool skip_offsets) bool skip_offsets)
{ {
String escaped_column_name; IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
if (filename.size())
escaped_column_name = escapeForFileName(filename);
else
escaped_column_name = escapeForFileName(name);
if (type.isNullable())
{ {
/// First create the stream that handles the null map of the given column. if (skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); return;
const IDataType & nested_type = *nullable_type.getNestedType();
std::string null_map_name = name + NULL_MAP_EXTENSION; String stream_name = IDataType::getFileNameForStream(name, substream_path);
column_streams[null_map_name] = std::make_unique<ColumnStream>(
escaped_column_name, /// Shared offsets for Nested type.
path + escaped_column_name, NULL_MAP_EXTENSION, if (column_streams.count(stream_name))
path + escaped_column_name, NULL_MARKS_FILE_EXTENSION, return;
column_streams[stream_name] = std::make_unique<ColumnStream>(
stream_name,
path + stream_name, DATA_FILE_EXTENSION,
path + stream_name, MARKS_FILE_EXTENSION,
max_compress_block_size, max_compress_block_size,
compression_settings, compression_settings,
estimated_size, estimated_size,
aio_threshold); aio_threshold);
};
/// Then create the stream that handles the data of the given column. type.enumerateStreams(callback, {});
addStream(path, name, nested_type, estimated_size, level, filename, false);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
if (!skip_offsets)
{
/// For arrays, separate files are used for sizes.
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
column_streams[size_name] = std::make_unique<ColumnStream>(
escaped_size_name,
path + escaped_size_name, DATA_FILE_EXTENSION,
path + escaped_size_name, MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_settings,
estimated_size,
aio_threshold);
}
addStream(path, name, *type_arr->getNestedType(), estimated_size, level + 1, "", false);
}
else
{
column_streams[name] = std::make_unique<ColumnStream>(
escaped_column_name,
path + escaped_column_name, DATA_FILE_EXTENSION,
path + escaped_column_name, MARKS_FILE_EXTENSION,
max_compress_block_size,
compression_settings,
estimated_size,
aio_threshold);
}
} }
void IMergedBlockOutputStream::writeData( void IMergedBlockOutputStream::writeData(
const String & name, const String & name,
const DataTypePtr & type, const IDataType & type,
const ColumnPtr & column, const IColumn & column,
OffsetColumns & offset_columns, OffsetColumns & offset_columns,
size_t level,
bool skip_offsets) bool skip_offsets)
{ {
writeDataImpl(name, type, column, nullptr, offset_columns, level, skip_offsets); size_t size = column.size();
}
void IMergedBlockOutputStream::writeDataImpl(
const String & name,
const DataTypePtr & type,
const ColumnPtr & column,
const ColumnPtr & offsets,
OffsetColumns & offset_columns,
size_t level,
bool skip_offsets)
{
/// NOTE: the parameter write_array_data indicates whether we call this method
/// to write the contents of an array. This is to cope with the fact that
/// serialization of arrays for the MergeTree engine slightly differs from
/// what the other engines do.
if (type->isNullable())
{
/// First write to the null map.
const auto & nullable_type = static_cast<const DataTypeNullable &>(*type);
const auto & nested_type = nullable_type.getNestedType();
const auto & nullable_col = static_cast<const ColumnNullable &>(*column);
const auto & nested_col = nullable_col.getNestedColumn();
std::string filename = name + NULL_MAP_EXTENSION;
ColumnStream & stream = *column_streams[filename];
auto null_map_type = std::make_shared<DataTypeUInt8>();
writeColumn(nullable_col.getNullMapColumn(), null_map_type, stream, offsets);
/// Then write data.
writeDataImpl(name, nested_type, nested_col, offsets, offset_columns, level, skip_offsets);
}
else if (auto type_arr = typeid_cast<const DataTypeArray *>(type.get()))
{
/// For arrays, you first need to serialize dimensions, and then values.
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
const auto & column_array = typeid_cast<const ColumnArray &>(*column);
ColumnPtr next_level_offsets;
ColumnPtr lengths_column;
auto offsets_data_type = std::make_shared<DataTypeNumber<ColumnArray::Offset_t>>();
if (offsets)
{
/// Have offsets from prev level. Calculate offsets for next level.
next_level_offsets = offsets->clone();
const auto & array_offsets = column_array.getOffsets();
auto & next_level_offsets_column = typeid_cast<ColumnArray::ColumnOffsets_t &>(*next_level_offsets);
auto & next_level_offsets_data = next_level_offsets_column.getData();
for (auto & offset : next_level_offsets_data)
offset = offset ? array_offsets[offset - 1] : 0;
/// Calculate lengths of arrays and write them as a new array.
lengths_column = column_array.getLengthsColumn();
}
if (!skip_offsets && offset_columns.count(size_name) == 0)
{
offset_columns.insert(size_name);
ColumnStream & stream = *column_streams[size_name];
if (offsets)
writeColumn(lengths_column, offsets_data_type, stream, offsets);
else
writeColumn(column, type, stream, nullptr);
}
writeDataImpl(name, type_arr->getNestedType(), column_array.getDataPtr(),
offsets ? next_level_offsets : column_array.getOffsetsColumn(),
offset_columns, level + 1, skip_offsets);
}
else
{
ColumnStream & stream = *column_streams[name];
writeColumn(column, type, stream, offsets);
}
}
void IMergedBlockOutputStream::writeColumn(
const ColumnPtr & column,
const DataTypePtr & type,
IMergedBlockOutputStream::ColumnStream & stream,
ColumnPtr offsets)
{
std::shared_ptr<DataTypeArray> array_type_holder;
DataTypeArray * array_type;
ColumnPtr array_column;
if (offsets)
{
array_type_holder = std::make_shared<DataTypeArray>(type);
array_type = array_type_holder.get();
array_column = std::make_shared<ColumnArray>(column, offsets);
}
else
array_type = typeid_cast<DataTypeArray *>(type.get());
size_t size = offsets ? offsets->size() : column->size();
size_t prev_mark = 0; size_t prev_mark = 0;
while (prev_mark < size) while (prev_mark < size)
{ {
@ -237,26 +92,76 @@ void IMergedBlockOutputStream::writeColumn(
{ {
limit = storage.index_granularity; limit = storage.index_granularity;
/// There could already be enough data to compress into the new block. /// Write marks.
if (stream.compressed.offset() >= min_compress_block_size) type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
stream.compressed.next(); {
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
writeIntBinary(stream.plain_hashing.count(), stream.marks); String stream_name = IDataType::getFileNameForStream(name, substream_path);
writeIntBinary(stream.compressed.offset(), stream.marks);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
ColumnStream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}, {});
} }
if (offsets) IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
array_type->serializeBinaryBulk(*array_column, stream.compressed, prev_mark, limit); {
else if (array_type) bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
array_type->serializeOffsets(*column, stream.compressed, prev_mark, limit); if (is_offsets && skip_offsets)
else return nullptr;
type->serializeBinaryBulk(*column, stream.compressed, prev_mark, limit);
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return nullptr;
return &column_streams[stream_name]->compressed;
};
type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, prev_mark, limit, true, {});
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
stream.compressed.nextIfAtEnd(); type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
column_streams[stream_name]->compressed.nextIfAtEnd();
}, {});
prev_mark += limit; prev_mark += limit;
} }
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
offset_columns.insert(stream_name);
}
}, {});
} }
@ -324,7 +229,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
{ {
init(); init();
for (const auto & it : columns_list) for (const auto & it : columns_list)
addStream(part_path, it.name, *it.type, 0, 0, "", false); addStreams(part_path, it.name, *it.type, 0, false);
} }
MergedBlockOutputStream::MergedBlockOutputStream( MergedBlockOutputStream::MergedBlockOutputStream(
@ -350,7 +255,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
if (it2 != merged_column_to_size_.end()) if (it2 != merged_column_to_size_.end())
estimated_size = it2->second; estimated_size = it2->second;
} }
addStream(part_path, it.name, *it.type, estimated_size, 0, "", false); addStreams(part_path, it.name, *it.type, estimated_size, false);
} }
} }
@ -467,7 +372,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
block.checkNumberOfRows(); block.checkNumberOfRows();
size_t rows = block.rows(); size_t rows = block.rows();
/// The set of written offset columns so that you do not write mutual to nested structures columns several times /// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
OffsetColumns offset_columns; OffsetColumns offset_columns;
auto sort_description = storage.getSortDescription(); auto sort_description = storage.getSortDescription();
@ -513,18 +418,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto primary_column_it = primary_columns_name_to_position.find(it.name); auto primary_column_it = primary_columns_name_to_position.find(it.name);
if (primary_columns_name_to_position.end() != primary_column_it) if (primary_columns_name_to_position.end() != primary_column_it)
{ {
writeData(column.name, column.type, primary_columns[primary_column_it->second].column, offset_columns, 0, false); writeData(column.name, *column.type, *primary_columns[primary_column_it->second].column, offset_columns, false);
} }
else else
{ {
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM. /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permutted_column = column.column->permute(*permutation, 0); ColumnPtr permutted_column = column.column->permute(*permutation, 0);
writeData(column.name, column.type, permutted_column, offset_columns, 0, false); writeData(column.name, *column.type, *permutted_column, offset_columns, false);
} }
} }
else else
{ {
writeData(column.name, column.type, column.column, offset_columns, 0, false); writeData(column.name, *column.type, *column.column, offset_columns, false);
} }
} }
@ -580,8 +485,8 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
column_streams.clear(); column_streams.clear();
for (size_t i = 0; i < block.columns(); ++i) for (size_t i = 0; i < block.columns(); ++i)
{ {
addStream(part_path, block.safeGetByPosition(i).name, addStreams(part_path, block.safeGetByPosition(i).name,
*block.safeGetByPosition(i).type, 0, 0, block.safeGetByPosition(i).name, skip_offsets); *block.safeGetByPosition(i).type, 0, skip_offsets);
} }
initialized = true; initialized = true;
} }
@ -592,7 +497,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i) for (size_t i = 0; i < block.columns(); ++i)
{ {
const ColumnWithTypeAndName & column = block.safeGetByPosition(i); const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, column.type, column.column, offset_columns, 0, skip_offsets); writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets);
} }
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity; size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;

View File

@ -62,12 +62,10 @@ protected:
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>; using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStream(const String & path, const String & name, const IDataType & type, size_t estimated_size, void addStreams(const String & path, const String & name, const IDataType & type, size_t estimated_size, bool skip_offsets);
size_t level, const String & filename, bool skip_offsets);
/// Write data of one column. /// Write data of one column.
void writeData(const String & name, const DataTypePtr & type, const ColumnPtr & column, void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, bool skip_offsets);
OffsetColumns & offset_columns, size_t level, bool skip_offsets);
MergeTreeData & storage; MergeTreeData & storage;
@ -82,21 +80,13 @@ protected:
size_t aio_threshold; size_t aio_threshold;
CompressionSettings compression_settings; CompressionSettings compression_settings;
private:
/// Internal version of writeData.
void writeDataImpl(const String & name, const DataTypePtr & type, const ColumnPtr & column,
const ColumnPtr & offsets, OffsetColumns & offset_columns, size_t level, bool skip_offsets);
/// Writes column data into stream.
/// If type is Array, writes offsets only. To write array data, unpack array column and use offsets argument.
void writeColumn(const ColumnPtr & column, const DataTypePtr & type, ColumnStream & stream, ColumnPtr offsets);
}; };
/** To write one part. /** To write one part.
* The data refers to one partition, and is written in one part. * The data refers to one partition, and is written in one part.
*/ */
class MergedBlockOutputStream : public IMergedBlockOutputStream class MergedBlockOutputStream final : public IMergedBlockOutputStream
{ {
public: public:
MergedBlockOutputStream( MergedBlockOutputStream(
@ -118,7 +108,7 @@ public:
/// If the data is pre-sorted. /// If the data is pre-sorted.
void write(const Block & block) override; void write(const Block & block) override;
/** If the data is not sorted, but we have previously calculated the permutation, after which they will be sorted. /** If the data is not sorted, but we have previously calculated the permutation, that will sort it.
* This method is used to save RAM, since you do not need to keep two blocks at once - the original one and the sorted one. * This method is used to save RAM, since you do not need to keep two blocks at once - the original one and the sorted one.
*/ */
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation); void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
@ -155,7 +145,7 @@ private:
/// Writes only those columns that are in `block` /// Writes only those columns that are in `block`
class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream class MergedColumnOnlyOutputStream final : public IMergedBlockOutputStream
{ {
public: public:
MergedColumnOnlyOutputStream( MergedColumnOnlyOutputStream(

View File

@ -20,7 +20,6 @@
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
@ -31,10 +30,7 @@
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION ".mrk"
#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk" #define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
#define DBMS_STORAGE_LOG_NULL_MARKS_FILE_NAME "__null_marks.mrk"
#define DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION ".null.bin"
namespace DB namespace DB
@ -44,6 +40,7 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED; extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int DUPLICATE_COLUMN; extern const int DUPLICATE_COLUMN;
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT; extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
} }
@ -60,21 +57,6 @@ public:
column_types(column_names.size()), column_types(column_names.size()),
storage(storage_), storage(storage_),
mark_number(mark_number_), mark_number(mark_number_),
null_mark_number(0),
rows_limit(rows_limit_),
max_read_buffer_size(max_read_buffer_size_)
{
}
LogBlockInputStream(
size_t block_size_, const Names & column_names_, StorageLog & storage_,
size_t mark_number_, size_t null_mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_),
column_names(column_names_),
column_types(column_names.size()),
storage(storage_),
mark_number(mark_number_),
null_mark_number(null_mark_number_),
rows_limit(rows_limit_), rows_limit(rows_limit_),
max_read_buffer_size(max_read_buffer_size_) max_read_buffer_size(max_read_buffer_size_)
{ {
@ -103,7 +85,6 @@ private:
DataTypes column_types; DataTypes column_types;
StorageLog & storage; StorageLog & storage;
size_t mark_number; /// from what mark to read data size_t mark_number; /// from what mark to read data
size_t null_mark_number;
size_t rows_limit; /// The maximum number of rows that can be read size_t rows_limit; /// The maximum number of rows that can be read
size_t rows_read = 0; size_t rows_read = 0;
size_t max_read_buffer_size; size_t max_read_buffer_size;
@ -122,11 +103,10 @@ private:
CompressedReadBuffer compressed; CompressedReadBuffer compressed;
}; };
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; using FileStreams = std::map<std::string, Stream>;
FileStreams streams; FileStreams streams;
void addStream(const String & name, const IDataType & type, size_t level = 0); void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, bool read_offsets = true);
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, size_t level = 0, bool read_offsets = true);
}; };
@ -136,12 +116,8 @@ public:
explicit LogBlockOutputStream(StorageLog & storage_) explicit LogBlockOutputStream(StorageLog & storage_)
: storage(storage_), : storage(storage_),
lock(storage.rwlock), lock(storage.rwlock),
marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY), marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY)
null_marks_stream(storage.has_nullable_columns ?
std::make_unique<WriteBufferFromFile>(storage.null_marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY) : nullptr)
{ {
for (const auto & column : storage.getColumnsList())
addStream(column.name, *column.type);
} }
~LogBlockOutputStream() override ~LogBlockOutputStream() override
@ -185,22 +161,21 @@ private:
} }
}; };
using Mark = StorageLog::Mark;
using MarksForColumns = std::vector<std::pair<size_t, Mark>>; using MarksForColumns = std::vector<std::pair<size_t, Mark>>;
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; using FileStreams = std::map<std::string, Stream>;
FileStreams streams; FileStreams streams;
using OffsetColumns = std::set<std::string>; using WrittenStreams = std::set<std::string>;
WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured. WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
std::unique_ptr<WriteBufferFromFile> null_marks_stream;
void addStream(const String & name, const IDataType & type, size_t level = 0);
void addNullStream(const String & name);
void writeData(const String & name, const IDataType & type, const IColumn & column, void writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks, MarksForColumns & out_null_marks, MarksForColumns & out_marks,
OffsetColumns & offset_columns, size_t level = 0); WrittenStreams & written_streams);
void writeMarks(MarksForColumns marks, bool write_null_marks);
void writeMarks(MarksForColumns && marks);
}; };
@ -218,20 +193,17 @@ Block LogBlockInputStream::readImpl()
/// If the files are not open, then open them. /// If the files are not open, then open them.
if (streams.empty()) if (streams.empty())
{ {
std::shared_lock<std::shared_mutex> lock(storage.rwlock);
for (size_t i = 0, size = column_names.size(); i < size; ++i) for (size_t i = 0, size = column_names.size(); i < size; ++i)
{ {
const auto & name = column_names[i]; const auto & name = column_names[i];
column_types[i] = storage.getDataTypeByName(name); column_types[i] = storage.getDataTypeByName(name);
addStream(name, *column_types[i]);
} }
} }
/// How many rows to read for the next block. /// How many rows to read for the next block.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read); size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
/// Pointers to offset columns, mutual for columns from nested data structures /// Pointers to offset columns, shared for columns from nested data structures
using OffsetColumns = std::map<std::string, ColumnPtr>; using OffsetColumns = std::map<std::string, ColumnPtr>;
OffsetColumns offset_columns; OffsetColumns offset_columns;
@ -245,23 +217,8 @@ Block LogBlockInputStream::readImpl()
bool read_offsets = true; bool read_offsets = true;
const IDataType * observed_type;
bool is_nullable;
if (column.type->isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*column.type);
observed_type = nullable_type.getNestedType().get();
is_nullable = true;
}
else
{
observed_type = column.type.get();
is_nullable = false;
}
/// For nested structures, remember pointers to columns with offsets /// For nested structures, remember pointers to columns with offsets
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(observed_type)) if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(column.type.get()))
{ {
String name = DataTypeNested::extractNestedTableName(column.name); String name = DataTypeNested::extractNestedTableName(column.name);
@ -271,15 +228,13 @@ Block LogBlockInputStream::readImpl()
read_offsets = false; /// on previous iterations the offsets were already read by `readData` read_offsets = false; /// on previous iterations the offsets were already read by `readData`
column.column = std::make_shared<ColumnArray>(type_arr->getNestedType()->createColumn(), offset_columns[name]); column.column = std::make_shared<ColumnArray>(type_arr->getNestedType()->createColumn(), offset_columns[name]);
if (is_nullable)
column.column = std::make_shared<ColumnNullable>(column.column, std::make_shared<ColumnUInt8>());
} }
else else
column.column = column.type->createColumn(); column.column = column.type->createColumn();
try try
{ {
readData(name, *column.type, *column.column, max_rows_to_read, 0, read_offsets); readData(name, *column.type, *column.column, max_rows_to_read, read_offsets);
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -307,88 +262,30 @@ Block LogBlockInputStream::readImpl()
} }
void LogBlockInputStream::addStream(const String & name, const IDataType & type, size_t level) void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, bool with_offsets)
{ {
if (type.isNullable()) IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{ {
/// First create the stream that handles the null map of the given column. if (!with_offsets && !path.empty() && path.back().type == IDataType::Substream::ArraySizes)
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); return nullptr;
const IDataType & nested_type = *nullable_type.getNestedType();
std::string filename = name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION; String stream_name = IDataType::getFileNameForStream(name, path);
streams.emplace(filename, std::make_unique<Stream>( const auto & file_it = storage.files.find(stream_name);
storage.files[filename].data_file.path(), if (storage.files.end() == file_it)
null_mark_number throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
? storage.files[filename].marks[null_mark_number].offset
: 0,
max_read_buffer_size));
/// Then create the stream that handles the data of the given column. auto it = streams.try_emplace(stream_name,
addStream(name, nested_type, level); file_it->second.data_file.path(),
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays, separate files are used for sizes.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
storage.files[size_name].data_file.path(),
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0,
max_read_buffer_size)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else
streams[name] = std::make_unique<Stream>(
storage.files[name].data_file.path(),
mark_number mark_number
? storage.files[name].marks[mark_number].offset ? file_it->second.marks[mark_number].offset
: 0, : 0,
max_read_buffer_size); max_read_buffer_size).first;
}
return &it->second.compressed;
};
void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, max_rows_to_read, 0, true, {}); /// TODO Use avg_value_size_hint.
size_t level, bool read_offsets)
{
if (type.isNullable())
{
/// First read from the null map.
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
IColumn & nested_col = *nullable_col.getNestedColumn();
DataTypeUInt8{}.deserializeBinaryBulk(nullable_col.getNullMapConcreteColumn(),
streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed, max_rows_to_read, 0);
/// Then read data.
readData(name, nested_type, nested_col, max_rows_to_read, level, read_offsets);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays, you first need to deserialize the dimensions, and then the values.
if (read_offsets)
{
type_arr->deserializeOffsets(
column,
streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
}
if (column.size())
readData(
name,
*type_arr->getNestedType(),
typeid_cast<ColumnArray &>(column).getData(),
typeid_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
level + 1);
}
else
type.deserializeBinaryBulk(column, streams[name]->compressed, max_rows_to_read, 0); /// TODO Use avg_value_size_hint.
} }
@ -396,25 +293,19 @@ void LogBlockOutputStream::write(const Block & block)
{ {
storage.check(block, true); storage.check(block, true);
/// The set of written offset columns so that you do not write mutual columns for nested structures multiple times /// The set of written offset columns so that you do not write shared offsets of columns for nested structures multiple times
OffsetColumns offset_columns; WrittenStreams written_streams;
MarksForColumns marks; MarksForColumns marks;
marks.reserve(storage.file_count); marks.reserve(storage.file_count);
MarksForColumns null_marks;
if (null_marks_stream)
null_marks.reserve(storage.null_file_count);
for (size_t i = 0; i < block.columns(); ++i) for (size_t i = 0; i < block.columns(); ++i)
{ {
const ColumnWithTypeAndName & column = block.safeGetByPosition(i); const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, marks, null_marks, offset_columns); writeData(column.name, *column.type, *column.column, marks, written_streams);
} }
writeMarks(marks, false); writeMarks(std::move(marks));
if (null_marks_stream)
writeMarks(null_marks, true);
} }
@ -426,15 +317,13 @@ void LogBlockOutputStream::writeSuffix()
/// Finish write. /// Finish write.
marks_stream.next(); marks_stream.next();
if (null_marks_stream)
null_marks_stream->next();
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it) for (auto & name_stream : streams)
it->second->finalize(); name_stream.second.finalize();
std::vector<Poco::File> column_files; std::vector<Poco::File> column_files;
for (auto & pair : streams) for (const auto & name_stream : streams)
column_files.push_back(storage.files[pair.first].data_file); column_files.push_back(storage.files[name_stream.first].data_file);
column_files.push_back(storage.marks_file); column_files.push_back(storage.marks_file);
storage.file_checker.update(column_files.begin(), column_files.end()); storage.file_checker.update(column_files.begin(), column_files.end());
@ -443,123 +332,68 @@ void LogBlockOutputStream::writeSuffix()
} }
void LogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level)
{
if (type.isNullable())
{
/// First create the stream that handles the null map of the given column.
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
std::string filename = name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION;
streams.emplace(filename, std::make_unique<Stream>(storage.files[filename].data_file.path(),
storage.max_compress_block_size));
/// Then create the stream that handles the data of the given column.
addStream(name, nested_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays separate files are used for sizes.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
storage.files[size_name].data_file.path(), storage.max_compress_block_size)));
addStream(name, *type_arr->getNestedType(), level + 1);
}
else
streams[name] = std::make_unique<Stream>(storage.files[name].data_file.path(), storage.max_compress_block_size);
}
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks, MarksForColumns & out_null_marks, MarksForColumns & out_marks,
OffsetColumns & offset_columns, size_t level) WrittenStreams & written_streams)
{ {
if (type.isNullable()) type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
{ {
/// First write to the null map. String stream_name = IDataType::getFileNameForStream(name, path);
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); if (written_streams.count(stream_name))
const IDataType & nested_type = *nullable_type.getNestedType(); return;
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(column); const auto & file = storage.files[stream_name];
const IColumn & nested_col = *nullable_col.getNestedColumn(); const auto stream_it = streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size).first;
std::string filename = name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION;
Mark mark; Mark mark;
mark.rows = (storage.files[filename].marks.empty() ? 0 : storage.files[filename].marks.back().rows) + column.size(); mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
mark.offset = streams[filename]->plain_offset + streams[filename]->plain.count(); mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
out_null_marks.emplace_back(storage.files[filename].column_index, mark); out_marks.emplace_back(file.column_index, mark);
}, {});
DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(), streams[filename]->compressed, 0, 0); IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
streams[filename]->compressed.next();
/// Then write data.
writeData(name, nested_type, nested_col, out_marks, out_null_marks, offset_columns, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{ {
/// For arrays, you first need to serialize the dimensions, and then the values. String stream_name = IDataType::getFileNameForStream(name, path);
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (written_streams.count(stream_name))
return nullptr;
if (offset_columns.count(size_name) == 0) auto it = streams.find(stream_name);
{ if (streams.end() == it)
offset_columns.insert(size_name); throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
return &it->second.compressed;
};
Mark mark; type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {});
mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size();
mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count();
out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark)); type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type_arr->serializeOffsets(column, streams[size_name]->compressed, 0, 0);
streams[size_name]->compressed.next();
}
writeData(name, *type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(),
out_marks, out_null_marks, offset_columns, level + 1);
}
else
{ {
Mark mark; String stream_name = IDataType::getFileNameForStream(name, path);
mark.rows = (storage.files[name].marks.empty() ? 0 : storage.files[name].marks.back().rows) + column.size(); if (!written_streams.emplace(stream_name).second)
mark.offset = streams[name]->plain_offset + streams[name]->plain.count(); return;
out_marks.push_back(std::make_pair(storage.files[name].column_index, mark)); auto it = streams.find(stream_name);
if (streams.end() == it)
type.serializeBinaryBulk(column, streams[name]->compressed, 0, 0); throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
streams[name]->compressed.next(); it->second.compressed.next();
} }, {});
} }
static bool ColumnIndexLess(const std::pair<size_t, Mark> & a, const std::pair<size_t, Mark> & b)
{
return a.first < b.first;
}
void LogBlockOutputStream::writeMarks(MarksForColumns marks, bool write_null_marks) void LogBlockOutputStream::writeMarks(MarksForColumns && marks)
{ {
size_t count = write_null_marks ? storage.null_file_count : storage.file_count; if (marks.size() != storage.file_count)
WriteBufferFromFile & stream = write_null_marks ? *null_marks_stream : marks_stream;
const Names & names = write_null_marks ? storage.null_map_filenames : storage.column_names;
if (marks.size() != count)
throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR); throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
sort(marks.begin(), marks.end(), ColumnIndexLess); std::sort(marks.begin(), marks.end(), [](const auto & a, const auto & b) { return a.first < b.first; });
for (size_t i = 0; i < marks.size(); ++i) for (const auto & mark : marks)
{ {
Mark mark = marks[i].second; writeIntBinary(mark.second.rows, marks_stream);
writeIntBinary(mark.second.offset, marks_stream);
writeIntBinary(mark.rows, stream); size_t column_index = mark.first;
writeIntBinary(mark.offset, stream); storage.files[storage.column_names[column_index]].marks.push_back(mark.second);
size_t column_index = marks[i].first;
storage.files[names[column_index]].marks.push_back(mark);
} }
} }
@ -573,83 +407,45 @@ StorageLog::StorageLog(
size_t max_compress_block_size_) size_t max_compress_block_size_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, : IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), name(name_), columns(columns_), path(path_), name(name_), columns(columns_),
loaded_marks(false), max_compress_block_size(max_compress_block_size_), max_compress_block_size(max_compress_block_size_),
file_checker(path + escapeForFileName(name) + '/' + "sizes.json") file_checker(path + escapeForFileName(name) + '/' + "sizes.json")
{ {
if (columns->empty()) if (columns->empty())
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
/// create files if they do not exist /// create files if they do not exist
Poco::File(path + escapeForFileName(name) + '/').createDirectories(); Poco::File(path + escapeForFileName(name) + '/').createDirectories();
for (const auto & column : getColumnsList()) for (const auto & column : getColumnsList())
addFile(column.name, *column.type); addFiles(column.name, *column.type);
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME); marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
if (has_nullable_columns)
null_marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_NULL_MARKS_FILE_NAME);
} }
void StorageLog::addFile(const String & column_name, const IDataType & type, size_t level) void StorageLog::addFiles(const String & column_name, const IDataType & type)
{ {
if (files.end() != files.find(column_name)) if (files.end() != files.find(column_name))
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.", throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN); ErrorCodes::DUPLICATE_COLUMN);
if (type.isNullable()) IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
{ {
/// First add the file describing the null map of the column. String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
has_nullable_columns = true;
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); if (!files.count(stream_name))
const IDataType & actual_type = *nullable_type.getNestedType();
std::string filename = column_name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION;
ColumnData & column_data = files.emplace(filename, ColumnData{}).first->second;
++null_file_count;
column_data.column_index = null_map_filenames.size();
column_data.data_file = Poco::File{
path + escapeForFileName(name) + '/' + escapeForFileName(column_name)
+ DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION};
null_map_filenames.push_back(filename);
/// Then add the file describing the column data.
addFile(column_name, actual_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_name = DataTypeNested::extractNestedTableName(column_name) + size_column_suffix;
if (files.end() == files.find(size_name))
{ {
ColumnData & column_data = files.insert(std::make_pair(size_name, ColumnData())).first->second; ColumnData & column_data = files[stream_name];
++file_count; column_data.column_index = file_count;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File{ column_data.data_file = Poco::File{
path + escapeForFileName(name) + '/' path + escapeForFileName(name) + '/' + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION};
+ escapeForFileName(DataTypeNested::extractNestedTableName(column_name))
+ size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION};
column_names.push_back(size_name); column_names.push_back(stream_name);
++file_count;
} }
};
addFile(column_name, *type_arr->getNestedType(), level + 1); type.enumerateStreams(stream_callback, {});
}
else
{
ColumnData & column_data = files.insert(std::make_pair(column_name, ColumnData())).first->second;
++file_count;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File{
path + escapeForFileName(name) + '/'
+ escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION};
column_names.push_back(column_name);
}
} }
@ -660,45 +456,24 @@ void StorageLog::loadMarks()
if (loaded_marks) if (loaded_marks)
return; return;
loadMarksImpl(false);
if (has_nullable_columns)
loadMarksImpl(true);
loaded_marks = true;
}
void StorageLog::loadMarksImpl(bool load_null_marks)
{
using FilesByIndex = std::vector<Files_t::iterator>; using FilesByIndex = std::vector<Files_t::iterator>;
size_t count = load_null_marks ? null_file_count : file_count; FilesByIndex files_by_index(file_count);
Poco::File & marks_file_handle = load_null_marks ? null_marks_file : marks_file;
FilesByIndex files_by_index(count);
for (Files_t::iterator it = files.begin(); it != files.end(); ++it) for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
bool has_null_extension = endsWith(it->first, DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION);
if (!load_null_marks && has_null_extension)
continue;
if (load_null_marks && !has_null_extension)
continue;
files_by_index[it->second.column_index] = it; files_by_index[it->second.column_index] = it;
}
if (marks_file_handle.exists()) if (marks_file.exists())
{ {
size_t file_size = marks_file_handle.getSize(); size_t file_size = marks_file.getSize();
if (file_size % (count * sizeof(Mark)) != 0) if (file_size % (file_count * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT); throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
int marks_count = file_size / (count * sizeof(Mark)); size_t marks_count = file_size / (file_count * sizeof(Mark));
for (size_t i = 0; i < files_by_index.size(); ++i) for (auto & file : files_by_index)
files_by_index[i]->second.marks.reserve(marks_count); file->second.marks.reserve(marks_count);
ReadBufferFromFile marks_rb(marks_file_handle.path(), 32768); ReadBufferFromFile marks_rb(marks_file.path(), 32768);
while (!marks_rb.eof()) while (!marks_rb.eof())
{ {
for (size_t i = 0; i < files_by_index.size(); ++i) for (size_t i = 0; i < files_by_index.size(); ++i)
@ -710,12 +485,8 @@ void StorageLog::loadMarksImpl(bool load_null_marks)
} }
} }
} }
}
loaded_marks = true;
size_t StorageLog::marksCount()
{
return files.begin()->second.marks.size();
} }
@ -730,16 +501,14 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_databa
name = new_table_name; name = new_table_name;
file_checker.setPath(path + escapeForFileName(name) + '/' + "sizes.json"); file_checker.setPath(path + escapeForFileName(name) + '/' + "sizes.json");
for (Files_t::iterator it = files.begin(); it != files.end(); ++it) for (auto & file : files)
it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName()); file.second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(file.second.data_file.path()).getFileName());
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME); marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
if (has_nullable_columns)
null_marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_NULL_MARKS_FILE_NAME);
} }
const Marks & StorageLog::getMarksWithRealRowCount() const const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
{ {
auto init_column_type = [&]() auto init_column_type = [&]()
{ {
@ -792,71 +561,26 @@ BlockInputStreams StorageLog::read(
const Marks & marks = getMarksWithRealRowCount(); const Marks & marks = getMarksWithRealRowCount();
size_t marks_size = marks.size(); size_t marks_size = marks.size();
/// Given a stream_num, return the start of the area from which
/// it can read data, i.e. a mark number.
auto mark_from_stream_num = [&](size_t stream_num)
{
/// The computation below reflects the fact that marks
/// are uniformly distributed among streams.
return stream_num * marks_size / num_streams;
};
/// Given a stream_num, get the parameters that specify the area
/// from which it can read data, i.e. a mark number and a
/// maximum number of rows.
auto get_reader_parameters = [&](size_t stream_num)
{
size_t mark_number = mark_from_stream_num(stream_num);
size_t cur_total_row_count = stream_num == 0
? 0
: marks[mark_number - 1].rows;
size_t next_total_row_count = marks[mark_from_stream_num(stream_num + 1) - 1].rows;
size_t rows_limit = next_total_row_count - cur_total_row_count;
return std::make_pair(mark_number, rows_limit);
};
if (num_streams > marks_size) if (num_streams > marks_size)
num_streams = marks_size; num_streams = marks_size;
size_t max_read_buffer_size = context.getSettingsRef().max_read_buffer_size; size_t max_read_buffer_size = context.getSettingsRef().max_read_buffer_size;
if (has_nullable_columns) for (size_t stream = 0; stream < num_streams; ++stream)
{ {
for (size_t stream = 0; stream < num_streams; ++stream) size_t mark_begin = stream * marks_size / num_streams;
{ size_t mark_end = (stream + 1) * marks_size / num_streams;
size_t mark_number;
size_t rows_limit;
std::tie(mark_number, rows_limit) = get_reader_parameters(stream);
res.push_back(std::make_shared<LogBlockInputStream>( size_t rows_begin = mark_begin ? marks[mark_begin - 1].rows : 0;
max_block_size, size_t rows_end = mark_end ? marks[mark_end - 1].rows : 0;
column_names,
*this,
mark_number,
mark_number,
rows_limit,
max_read_buffer_size));
}
}
else
{
for (size_t stream = 0; stream < num_streams; ++stream)
{
size_t mark_number;
size_t rows_limit;
std::tie(mark_number, rows_limit) = get_reader_parameters(stream);
res.push_back(std::make_shared<LogBlockInputStream>( res.emplace_back(std::make_shared<LogBlockInputStream>(
max_block_size, max_block_size,
column_names, column_names,
*this, *this,
mark_number, mark_begin,
rows_limit, rows_end - rows_begin,
max_read_buffer_size)); max_read_buffer_size));
}
} }
return res; return res;

View File

@ -14,27 +14,7 @@
namespace DB namespace DB
{ {
namespace ErrorCodes /** Implements simple table engine without support of indices.
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
/** Offsets to every single set of values.
* These sets are the same size in different columns.
* They are needed so that you can read the data in several threads.
*/
struct Mark
{
size_t rows; /// How many lines are contained in this set and all previous ones.
size_t offset; /// The offset to the set in the compressed file.
};
using Marks = std::vector<Mark>;
/** Implements a table engine that is suitable for logs.
* Keys are not supported.
* The data is stored in a compressed form. * The data is stored in a compressed form.
*/ */
class StorageLog : public ext::shared_ptr_helper<StorageLog>, public IStorage class StorageLog : public ext::shared_ptr_helper<StorageLog>, public IStorage
@ -60,18 +40,6 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
/// Column data
struct ColumnData
{
/// Specifies the column number in the marks file.
/// Does not necessarily match the column number among the columns of the table: columns with lengths of arrays are also numbered here.
size_t column_index;
Poco::File data_file;
Marks marks;
};
using Files_t = std::map<String, ColumnData>;
bool checkData() const override; bool checkData() const override;
protected: protected:
@ -95,20 +63,42 @@ private:
mutable std::shared_mutex rwlock; mutable std::shared_mutex rwlock;
/** Offsets to some row number in a file for column in table.
* They are needed so that you can read the data in several threads.
*/
struct Mark
{
size_t rows; /// How many rows are before this offset including the block at this offset.
size_t offset; /// The offset in compressed file.
};
using Marks = std::vector<Mark>;
/// Column data
struct ColumnData
{
/// Specifies the column number in the marks file.
/// Does not necessarily match the column number among the columns of the table: columns with lengths of arrays are also numbered here.
size_t column_index;
Poco::File data_file;
Marks marks;
};
using Files_t = std::map<String, ColumnData>;
Files_t files; /// name -> data Files_t files; /// name -> data
Names column_names; /// column_index -> name Names column_names; /// column_index -> name
Names null_map_filenames;
Poco::File marks_file; Poco::File marks_file;
Poco::File null_marks_file;
bool loaded_marks; /// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
bool has_nullable_columns = false; void addFiles(const String & column_name, const IDataType & type);
bool loaded_marks = false;
size_t max_compress_block_size; size_t max_compress_block_size;
size_t file_count = 0; size_t file_count = 0;
size_t null_file_count = 0;
FileChecker file_checker; FileChecker file_checker;
@ -117,24 +107,19 @@ private:
/// You can not call with a write locked `rwlock`. /// You can not call with a write locked `rwlock`.
void loadMarks(); void loadMarks();
/// Can be called with any state of `rwlock`.
size_t marksCount();
void loadMarksImpl(bool load_null_marks);
/// The order of adding files should not change: it corresponds to the order of the columns in the marks file. /// The order of adding files should not change: it corresponds to the order of the columns in the marks file.
void addFile(const String & column_name, const IDataType & type, size_t level = 0); void addFile(const String & column_name, const IDataType & type, size_t level = 0);
/** For normal columns, the number of rows in the block is specified in the marks. /** For normal columns, the number of rows in the block is specified in the marks.
* For array columns and nested structures, there are more than one group of marks that correspond to different files * For array columns and nested structures, there are more than one group of marks that correspond to different files
* - for insides (file name.bin) - the total number of array elements in the block is specified, * - for elements (file name.bin) - the total number of array elements in the block is specified,
* - for array sizes (file name.size0.bin) - the number of rows (the whole arrays themselves) in the block is specified. * - for array sizes (file name.size0.bin) - the number of rows (the whole arrays themselves) in the block is specified.
* *
* Return the first group of marks that contain the number of rows, but not the internals of the arrays. * Return the first group of marks that contain the number of rows, but not the internals of the arrays.
*/ */
const Marks & getMarksWithRealRowCount() const; const Marks & getMarksWithRealRowCount() const;
std::string getFullPath() const { return path + escapeForFileName(name) + '/';} std::string getFullPath() const { return path + escapeForFileName(name) + '/'; }
}; };
} }

View File

@ -36,7 +36,6 @@
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION ".null.bin"
namespace DB namespace DB
@ -88,8 +87,7 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
FileStreams streams; FileStreams streams;
void addStream(const String & name, const IDataType & type, size_t level = 0); void readData(const String & name, const IDataType & type, IColumn & column, size_t limit, bool read_offsets = true);
void readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level = 0, bool read_offsets = true);
}; };
@ -99,8 +97,6 @@ public:
explicit TinyLogBlockOutputStream(StorageTinyLog & storage_) explicit TinyLogBlockOutputStream(StorageTinyLog & storage_)
: storage(storage_) : storage(storage_)
{ {
for (const auto & col : storage.getColumnsList())
addStream(col.name, *col.type);
} }
~TinyLogBlockOutputStream() override ~TinyLogBlockOutputStream() override
@ -143,10 +139,9 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
FileStreams streams; FileStreams streams;
using OffsetColumns = std::set<std::string>; using WrittenStreams = std::set<std::string>;
void addStream(const String & name, const IDataType & type, size_t level = 0); void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level = 0);
}; };
@ -191,11 +186,10 @@ Block TinyLogBlockInputStream::readImpl()
{ {
const auto & name = column_names[i]; const auto & name = column_names[i];
column_types[i] = storage.getDataTypeByName(name); column_types[i] = storage.getDataTypeByName(name);
addStream(name, *column_types[i]);
} }
} }
/// Pointers to offset columns, mutual for columns from nested data structures /// Pointers to offset columns, shared for columns from nested data structures
using OffsetColumns = std::map<std::string, ColumnPtr>; using OffsetColumns = std::map<std::string, ColumnPtr>;
OffsetColumns offset_columns; OffsetColumns offset_columns;
@ -209,23 +203,8 @@ Block TinyLogBlockInputStream::readImpl()
bool read_offsets = true; bool read_offsets = true;
const IDataType * observed_type;
bool is_nullable;
if (column.type->isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*column.type);
observed_type = nullable_type.getNestedType().get();
is_nullable = true;
}
else
{
observed_type = column.type.get();
is_nullable = false;
}
/// For nested structures, remember pointers to columns with offsets /// For nested structures, remember pointers to columns with offsets
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(observed_type)) if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(column.type.get()))
{ {
String nested_name = DataTypeNested::extractNestedTableName(column.name); String nested_name = DataTypeNested::extractNestedTableName(column.name);
@ -235,15 +214,13 @@ Block TinyLogBlockInputStream::readImpl()
read_offsets = false; /// on previous iterations, the offsets were already calculated by `readData` read_offsets = false; /// on previous iterations, the offsets were already calculated by `readData`
column.column = std::make_shared<ColumnArray>(type_arr->getNestedType()->createColumn(), offset_columns[nested_name]); column.column = std::make_shared<ColumnArray>(type_arr->getNestedType()->createColumn(), offset_columns[nested_name]);
if (is_nullable)
column.column = std::make_shared<ColumnNullable>(column.column, std::make_shared<ColumnUInt8>());
} }
else else
column.column = column.type->createColumn(); column.column = column.type->createColumn();
try try
{ {
readData(name, *column.type, *column.column, block_size, 0, read_offsets); readData(name, *column.type, *column.column, block_size, read_offsets);
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -265,139 +242,40 @@ Block TinyLogBlockInputStream::readImpl()
} }
void TinyLogBlockInputStream::addStream(const String & name, const IDataType & type, size_t level) void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit, bool with_offsets)
{ {
if (type.isNullable()) IDataType::InputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
{ {
/// First create the stream that handles the null map of the given column. if (!with_offsets && !path.empty() && path.back().type == IDataType::Substream::ArraySizes)
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); return nullptr;
const IDataType & nested_type = *nullable_type.getNestedType();
std::string filename = name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION;
streams.emplace(filename, std::make_unique<Stream>(storage.files[filename].data_file.path(), max_read_buffer_size));
/// Then create the stream that handles the data of the given column. String stream_name = IDataType::getFileNameForStream(name, path);
addStream(name, nested_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays separate files are used for sizes.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(storage.files[size_name].data_file.path(), max_read_buffer_size)));
addStream(name, *type_arr->getNestedType(), level + 1); if (!streams.count(stream_name))
} streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), max_read_buffer_size);
else
streams[name] = std::make_unique<Stream>(storage.files[name].data_file.path(), max_read_buffer_size);
}
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level, bool read_offsets) return &streams[stream_name]->compressed;
{ };
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
if (!column.isNullable()) type.deserializeBinaryBulkWithMultipleStreams(column, stream_getter, limit, 0, true, {}); /// TODO Use avg_value_size_hint.
throw Exception{"Internal error: the column " + name + " is not nullable", ErrorCodes::LOGICAL_ERROR};
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
IColumn & nested_col = *nullable_col.getNestedColumn();
/// First read from the null map.
DataTypeUInt8{}.deserializeBinaryBulk(nullable_col.getNullMapConcreteColumn(),
streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed, limit, 0);
/// Then read data.
readData(name, nested_type, nested_col, limit, level, read_offsets);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays you first need to deserialize dimensions, and then the values.
if (read_offsets)
{
type_arr->deserializeOffsets(
column,
streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
limit);
}
if (column.size())
{
IColumn & nested_column = typeid_cast<ColumnArray &>(column).getData();
size_t nested_limit = typeid_cast<ColumnArray &>(column).getOffsets()[column.size() - 1];
readData(name, *type_arr->getNestedType(), nested_column, nested_limit, level + 1);
if (nested_column.size() != nested_limit)
throw Exception("Cannot read array data for all offsets", ErrorCodes::CANNOT_READ_ALL_DATA);
}
}
else
type.deserializeBinaryBulk(column, streams[name]->compressed, limit, 0); /// TODO Use avg_value_size_hint.
} }
void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level) void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams)
{ {
if (type.isNullable()) IDataType::OutputStreamGetter stream_getter = [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{ {
/// First create the stream that handles the null map of the given column. String stream_name = IDataType::getFileNameForStream(name, path);
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
std::string filename = name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION; if (!streams.count(stream_name))
streams.emplace(filename, std::make_unique<Stream>(storage.files[filename].data_file.path(), storage.max_compress_block_size)); streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
else if (!written_streams.insert(stream_name).second)
return nullptr;
/// Then create the stream that handles the data of the given column. return &streams[stream_name]->compressed;
addStream(name, nested_type, level); };
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays separate files are used for sizes.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size)));
addStream(name, *type_arr->getNestedType(), level + 1); type.serializeBinaryBulkWithMultipleStreams(column, stream_getter, 0, 0, true, {});
}
else
streams[name] = std::make_unique<Stream>(storage.files[name].data_file.path(), storage.max_compress_block_size);
}
void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
OffsetColumns & offset_columns, size_t level)
{
if (type.isNullable())
{
/// First write to the null map.
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *nullable_type.getNestedType();
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(column);
const IColumn & nested_col = *nullable_col.getNestedColumn();
DataTypeUInt8{}.serializeBinaryBulk(nullable_col.getNullMapConcreteColumn(),
streams[name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION]->compressed, 0, 0);
/// Then write data.
writeData(name, nested_type, nested_col, offset_columns, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// For arrays, you first need to serialize the dimensions, and then the values.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (offset_columns.count(size_name) == 0)
{
offset_columns.insert(size_name);
type_arr->serializeOffsets(column, streams[size_name]->compressed, 0, 0);
}
writeData(name, *type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(), offset_columns, level + 1);
}
else
type.serializeBinaryBulk(column, streams[name]->compressed, 0, 0);
} }
@ -425,13 +303,13 @@ void TinyLogBlockOutputStream::write(const Block & block)
{ {
storage.check(block, true); storage.check(block, true);
/// The set of written offset columns so that you do not write mutual columns for nested structures multiple times /// The set of written offset columns so that you do not write shared columns for nested structures multiple times
OffsetColumns offset_columns; WrittenStreams written_streams;
for (size_t i = 0; i < block.columns(); ++i) for (size_t i = 0; i < block.columns(); ++i)
{ {
const ColumnWithTypeAndName & column = block.safeGetByPosition(i); const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns); writeData(column.name, *column.type, *column.column, written_streams);
} }
} }
@ -463,52 +341,29 @@ StorageTinyLog::StorageTinyLog(
} }
for (const auto & col : getColumnsList()) for (const auto & col : getColumnsList())
addFile(col.name, *col.type); addFiles(col.name, *col.type);
} }
void StorageTinyLog::addFile(const String & column_name, const IDataType & type, size_t level) void StorageTinyLog::addFiles(const String & column_name, const IDataType & type)
{ {
if (files.end() != files.find(column_name)) if (files.end() != files.find(column_name))
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.", throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
ErrorCodes::DUPLICATE_COLUMN); ErrorCodes::DUPLICATE_COLUMN);
if (type.isNullable()) IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
{ {
/// First add the file describing the null map of the column. String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type); if (!files.count(stream_name))
const IDataType & actual_type = *nullable_type.getNestedType();
std::string filename = column_name + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION;
ColumnData & column_data = files.emplace(filename, ColumnData{}).first->second;
column_data.data_file = Poco::File{
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION};
/// Then add the file describing the column data.
addFile(column_name, actual_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_name = DataTypeNested::extractNestedTableName(column_name) + size_column_suffix;
if (files.end() == files.find(size_name))
{ {
ColumnData column_data; ColumnData column_data;
files.insert(std::make_pair(size_name, column_data)); files.insert(std::make_pair(stream_name, column_data));
files[size_name].data_file = Poco::File( files[stream_name].data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(DataTypeNested::extractNestedTableName(column_name)) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION); path + escapeForFileName(name) + '/' + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
} }
};
addFile(column_name, *type_arr->getNestedType(), level + 1); type.enumerateStreams(stream_callback, {});
}
else
{
ColumnData column_data;
files.insert(std::make_pair(column_name, column_data));
files[column_name].data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
}
} }
@ -548,15 +403,6 @@ BlockOutputStreamPtr StorageTinyLog::write(
} }
void StorageTinyLog::drop()
{
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
if (it->second.data_file.exists())
it->second.data_file.remove();
}
}
bool StorageTinyLog::checkData() const bool StorageTinyLog::checkData() const
{ {
return file_checker.check(); return file_checker.check();

View File

@ -39,8 +39,6 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void drop() override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
bool checkData() const override; bool checkData() const override;
@ -68,6 +66,7 @@ private:
Logger * log; Logger * log;
void addFile(const String & column_name, const IDataType & type, size_t level = 0); void addFile(const String & column_name, const IDataType & type, size_t level = 0);
void addFiles(const String & column_name, const IDataType & type);
protected: protected:
StorageTinyLog( StorageTinyLog(

View File

@ -1,3 +0,0 @@
DROP TABLE IF EXISTS arrays_test;
CREATE TABLE arrays_test (s String, arr Array(UInt8)) ENGINE = Memory;
INSERT INTO arrays_test VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);

View File

@ -1,3 +0,0 @@
Hello [1,2]
World [3,4,5]
Goodbye []

View File

@ -1 +0,0 @@
SELECT * FROM arrays_test

View File

@ -1,5 +0,0 @@
Hello 1
Hello 2
World 3
World 4
World 5

View File

@ -1 +0,0 @@
SELECT s, arr FROM arrays_test ARRAY JOIN arr

View File

@ -1,5 +0,0 @@
Hello [1,2] 1
Hello [1,2] 2
World [3,4,5] 3
World [3,4,5] 4
World [3,4,5] 5

View File

@ -1 +0,0 @@
SELECT s, arr, a FROM arrays_test ARRAY JOIN arr AS a

View File

@ -1,5 +0,0 @@
Hello [1,2] 1 1
Hello [1,2] 2 2
World [3,4,5] 3 1
World [3,4,5] 4 2
World [3,4,5] 5 3

View File

@ -1 +0,0 @@
SELECT s, arr, a, num FROM arrays_test ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num

View File

@ -1,5 +0,0 @@
Hello [1,2] 1 1 [1,2]
Hello [1,2] 2 2 [1,2]
World [3,4,5] 3 1 [1,2,3]
World [3,4,5] 4 2 [1,2,3]
World [3,4,5] 5 3 [1,2,3]

View File

@ -1 +0,0 @@
SELECT s, arr, a, num, arrayEnumerate(arr) FROM arrays_test ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num

View File

@ -1,5 +0,0 @@
Hello [1,2] 1 2
Hello [1,2] 2 3
World [3,4,5] 3 4
World [3,4,5] 4 5
World [3,4,5] 5 6

View File

@ -1 +0,0 @@
SELECT s, arr, a, mapped FROM arrays_test ARRAY JOIN arr AS a, arrayMap(x -> x + 1, arr) AS mapped

View File

@ -1,5 +0,0 @@
Hello [1,2] 1 1 2
Hello [1,2] 2 2 3
World [3,4,5] 3 1 4
World [3,4,5] 4 2 5
World [3,4,5] 5 3 6

View File

@ -1 +0,0 @@
SELECT s, arr, a, num, mapped FROM arrays_test ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num, arrayMap(x -> x + 1, arr) AS mapped

View File

@ -1,2 +0,0 @@
SELECT sumArray(arr), sumArrayIf(arr, s LIKE '%l%'), sumArrayIf(arr, s LIKE '%e%') FROM arrays_test

View File

@ -0,0 +1,34 @@
Hello [1,2]
World [3,4,5]
Goodbye []
Hello 1
Hello 2
World 3
World 4
World 5
Hello [1,2] 1
Hello [1,2] 2
World [3,4,5] 3
World [3,4,5] 4
World [3,4,5] 5
Hello [1,2] 1 1
Hello [1,2] 2 2
World [3,4,5] 3 1
World [3,4,5] 4 2
World [3,4,5] 5 3
Hello [1,2] 1 1 [1,2]
Hello [1,2] 2 2 [1,2]
World [3,4,5] 3 1 [1,2,3]
World [3,4,5] 4 2 [1,2,3]
World [3,4,5] 5 3 [1,2,3]
Hello [1,2] 1 2
Hello [1,2] 2 3
World [3,4,5] 3 4
World [3,4,5] 4 5
World [3,4,5] 5 6
Hello [1,2] 1 1 2
Hello [1,2] 2 2 3
World [3,4,5] 3 1 4
World [3,4,5] 4 2 5
World [3,4,5] 5 3 6
15 15 3

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS arrays_test;
CREATE TABLE arrays_test (s String, arr Array(UInt8)) ENGINE = Memory;
INSERT INTO arrays_test VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);
SELECT * FROM arrays_test;
SELECT s, arr FROM arrays_test ARRAY JOIN arr;
SELECT s, arr, a FROM arrays_test ARRAY JOIN arr AS a;
SELECT s, arr, a, num FROM arrays_test ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num;
SELECT s, arr, a, num, arrayEnumerate(arr) FROM arrays_test ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num;
SELECT s, arr, a, mapped FROM arrays_test ARRAY JOIN arr AS a, arrayMap(x -> x + 1, arr) AS mapped;
SELECT s, arr, a, num, mapped FROM arrays_test ARRAY JOIN arr AS a, arrayEnumerate(arr) AS num, arrayMap(x -> x + 1, arr) AS mapped;
SELECT sumArray(arr), sumArrayIf(arr, s LIKE '%l%'), sumArrayIf(arr, s LIKE '%e%') FROM arrays_test;

View File

@ -1,3 +0,0 @@
DROP TABLE IF EXISTS nested_test;
CREATE TABLE nested_test (s String, nest Nested(x UInt8, y UInt32)) ENGINE = Memory;
INSERT INTO nested_test VALUES ('Hello', [1,2], [10,20]), ('World', [3,4,5], [30,40,50]), ('Goodbye', [], []);

View File

@ -1,3 +0,0 @@
Hello [1,2] [10,20]
World [3,4,5] [30,40,50]
Goodbye [] []

View File

@ -1 +0,0 @@
SELECT * FROM nested_test

View File

@ -1,5 +0,0 @@
Hello 1 10
Hello 2 20
World 3 30
World 4 40
World 5 50

View File

@ -1 +0,0 @@
SELECT s, nest.x, nest.y FROM nested_test ARRAY JOIN nest

View File

@ -1,5 +0,0 @@
Hello 1 [10,20]
Hello 2 [10,20]
World 3 [30,40,50]
World 4 [30,40,50]
World 5 [30,40,50]

View File

@ -1 +0,0 @@
SELECT s, nest.x, nest.y FROM nested_test ARRAY JOIN nest.x

View File

@ -1,5 +0,0 @@
Hello 1 10
Hello 2 20
World 3 30
World 4 40
World 5 50

View File

@ -1 +0,0 @@
SELECT s, nest.x, nest.y FROM nested_test ARRAY JOIN nest.x, nest.y

View File

@ -1,5 +0,0 @@
Hello 1 10
Hello 2 20
World 3 30
World 4 40
World 5 50

View File

@ -1 +0,0 @@
SELECT s, n.x, n.y FROM nested_test ARRAY JOIN nest AS n

View File

@ -1,5 +0,0 @@
Hello 1 10 [1,2]
Hello 2 20 [1,2]
World 3 30 [3,4,5]
World 4 40 [3,4,5]
World 5 50 [3,4,5]

View File

@ -1 +0,0 @@
SELECT s, n.x, n.y, nest.x FROM nested_test ARRAY JOIN nest AS n

View File

@ -1,5 +0,0 @@
Hello 1 10 [1,2] [10,20]
Hello 2 20 [1,2] [10,20]
World 3 30 [3,4,5] [30,40,50]
World 4 40 [3,4,5] [30,40,50]
World 5 50 [3,4,5] [30,40,50]

View File

@ -1 +0,0 @@
SELECT s, n.x, n.y, nest.x, nest.y FROM nested_test ARRAY JOIN nest AS n

View File

@ -1,5 +0,0 @@
Hello 1 10 [1,2] [10,20] 1
Hello 2 20 [1,2] [10,20] 2
World 3 30 [3,4,5] [30,40,50] 1
World 4 40 [3,4,5] [30,40,50] 2
World 5 50 [3,4,5] [30,40,50] 3

View File

@ -1 +0,0 @@
SELECT s, n.x, n.y, nest.x, nest.y, num FROM nested_test ARRAY JOIN nest AS n, arrayEnumerate(nest.x) AS num

View File

@ -0,0 +1,38 @@
Hello [1,2] [10,20]
World [3,4,5] [30,40,50]
Goodbye [] []
Hello 1 10
Hello 2 20
World 3 30
World 4 40
World 5 50
Hello 1 [10,20]
Hello 2 [10,20]
World 3 [30,40,50]
World 4 [30,40,50]
World 5 [30,40,50]
Hello 1 10
Hello 2 20
World 3 30
World 4 40
World 5 50
Hello 1 10
Hello 2 20
World 3 30
World 4 40
World 5 50
Hello 1 10 [1,2]
Hello 2 20 [1,2]
World 3 30 [3,4,5]
World 4 40 [3,4,5]
World 5 50 [3,4,5]
Hello 1 10 [1,2] [10,20]
Hello 2 20 [1,2] [10,20]
World 3 30 [3,4,5] [30,40,50]
World 4 40 [3,4,5] [30,40,50]
World 5 50 [3,4,5] [30,40,50]
Hello 1 10 [1,2] [10,20] 1
Hello 2 20 [1,2] [10,20] 2
World 3 30 [3,4,5] [30,40,50] 1
World 4 40 [3,4,5] [30,40,50] 2
World 5 50 [3,4,5] [30,40,50] 3

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS nested_test;
CREATE TABLE nested_test (s String, nest Nested(x UInt8, y UInt32)) ENGINE = Memory;
INSERT INTO nested_test VALUES ('Hello', [1,2], [10,20]), ('World', [3,4,5], [30,40,50]), ('Goodbye', [], []);
SELECT * FROM nested_test;
SELECT s, nest.x, nest.y FROM nested_test ARRAY JOIN nest;
SELECT s, nest.x, nest.y FROM nested_test ARRAY JOIN nest.x;
SELECT s, nest.x, nest.y FROM nested_test ARRAY JOIN nest.x, nest.y;
SELECT s, n.x, n.y FROM nested_test ARRAY JOIN nest AS n;
SELECT s, n.x, n.y, nest.x FROM nested_test ARRAY JOIN nest AS n;
SELECT s, n.x, n.y, nest.x, nest.y FROM nested_test ARRAY JOIN nest AS n;
SELECT s, n.x, n.y, nest.x, nest.y, num FROM nested_test ARRAY JOIN nest AS n, arrayEnumerate(nest.x) AS num;

View File

@ -1,15 +0,0 @@
DROP TABLE IF EXISTS alter_test;
CREATE TABLE alter_test (CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);
INSERT INTO alter_test VALUES (1, '2014-01-01', 2, 3, [1,2,3], ['a','b','c'], 4);
ALTER TABLE alter_test ADD COLUMN Added0 UInt32;
ALTER TABLE alter_test ADD COLUMN Added2 UInt32;
ALTER TABLE alter_test ADD COLUMN Added1 UInt32 AFTER Added0;
ALTER TABLE alter_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;
ALTER TABLE alter_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;
ALTER TABLE alter_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;
DESC TABLE alter_test;

View File

@ -1,11 +0,0 @@
CounterID UInt32
StartDate Date
UserID UInt32
VisitID UInt32
Added0 String
Added1 UInt32
Added2 UInt32
AddedNested1.A Array(UInt32)
AddedNested1.C Array(String)
AddedNested2.A Array(UInt32)
AddedNested2.B Array(UInt64)

View File

@ -1,10 +0,0 @@
ALTER TABLE alter_test DROP COLUMN ToDrop;
ALTER TABLE alter_test MODIFY COLUMN Added0 String;
ALTER TABLE alter_test DROP COLUMN NestedColumn.A;
ALTER TABLE alter_test DROP COLUMN NestedColumn.S;
ALTER TABLE alter_test DROP COLUMN AddedNested1.B;
DESC TABLE alter_test;

View File

@ -1 +0,0 @@
1 2014-01-01 2 3 0 0 [] [] [] []

View File

@ -1,3 +0,0 @@
SELECT * FROM alter_test;
DROP TABLE alter_test;

View File

@ -13,3 +13,15 @@ AddedNested1.B Array(UInt64)
AddedNested1.C Array(String) AddedNested1.C Array(String)
AddedNested2.A Array(UInt32) AddedNested2.A Array(UInt32)
AddedNested2.B Array(UInt64) AddedNested2.B Array(UInt64)
CounterID UInt32
StartDate Date
UserID UInt32
VisitID UInt32
Added0 String
Added1 UInt32
Added2 UInt32
AddedNested1.A Array(UInt32)
AddedNested1.C Array(String)
AddedNested2.A Array(UInt32)
AddedNested2.B Array(UInt64)
1 2014-01-01 2 3 0 0 [] [] [] []

View File

@ -0,0 +1,30 @@
DROP TABLE IF EXISTS test.alter_test;
CREATE TABLE test.alter_test (CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);
INSERT INTO test.alter_test VALUES (1, '2014-01-01', 2, 3, [1,2,3], ['a','b','c'], 4);
ALTER TABLE test.alter_test ADD COLUMN Added0 UInt32;
ALTER TABLE test.alter_test ADD COLUMN Added2 UInt32;
ALTER TABLE test.alter_test ADD COLUMN Added1 UInt32 AFTER Added0;
ALTER TABLE test.alter_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;
ALTER TABLE test.alter_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;
ALTER TABLE test.alter_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;
DESC TABLE test.alter_test;
ALTER TABLE test.alter_test DROP COLUMN ToDrop;
ALTER TABLE test.alter_test MODIFY COLUMN Added0 String;
ALTER TABLE test.alter_test DROP COLUMN NestedColumn.A;
ALTER TABLE test.alter_test DROP COLUMN NestedColumn.S;
ALTER TABLE test.alter_test DROP COLUMN AddedNested1.B;
DESC TABLE test.alter_test;
SELECT * FROM test.alter_test;
DROP TABLE test.alter_test;