dbms: changed signature of IDataType::serializeBinary [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-12-11 20:31:39 +00:00
parent 7f27cbbcb7
commit de13137958
13 changed files with 111 additions and 144 deletions

View File

@ -42,7 +42,7 @@ public:
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;
void deserializeText(Field & field, ReadBuffer & istr) const;

View File

@ -50,7 +50,7 @@ public:
*/
/** Записать только значения, без размеров. Вызывающая сторона также должна куда-нибудь записать смещения. */
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
/** Прочитать только значения, без размеров.
* При этом, в column уже заранее должны быть считаны все размеры.
@ -58,7 +58,7 @@ public:
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
/** Записать размеры. */
void serializeOffsets(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
/** Прочитать размеры. Вызывайте этот метод перед чтением значений. */
void deserializeOffsets(IColumn & column, ReadBuffer & istr, size_t limit) const;

View File

@ -34,7 +34,7 @@ public:
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;

View File

@ -28,7 +28,7 @@ public:
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;

View File

@ -38,14 +38,13 @@ public:
virtual void deserializeBinary(Field & field, ReadBuffer & istr) const = 0;
/** Сериализация столбца.
* Можно передать callback, который будет вызван для некоторых значений.
* callback вызывается для 0-го значения и возвращает индекс следующего значения,
* для которого его следует вызвать.
* Это может быть использовано для одновременной записи индексного файла.
* offset и limit используются, чтобы сериализовать часть столбца.
* limit = 0 - означает - не ограничено.
* offset не должен быть больше размера столбца.
* offset + limit может быть больше размера столбца
* - в этом случае, столбец сериализуется до конца.
*/
typedef boost::function<size_t()> WriteCallback;
virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr,
WriteCallback callback = WriteCallback()) const = 0;
virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const = 0;
/** Считать не более limit значений. */
virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const = 0;

View File

@ -24,7 +24,7 @@ public:
void deserializeBinary(Field & field, ReadBuffer & istr) const { throwNoSerialization(); }
void serializeBinary(const IColumn & column, WriteBuffer & ostr,
WriteCallback callback = WriteCallback()) const { throwNoSerialization(); }
size_t offset = 0, size_t limit = 0) const { throwNoSerialization(); }
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const { throwNoSerialization(); }

View File

@ -21,7 +21,6 @@ namespace DB
template <typename FieldType, typename ColumnType>
class IDataTypeNumberFixed : public IDataTypeNumber<FieldType>
{
typedef IDataType::WriteCallback WriteCallback;
public:
/** Формат платформозависимый (зависит от представления данных в памяти).
*/
@ -40,25 +39,16 @@ public:
field = typename NearestFieldType<FieldType>::Type(x);
}
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const
{
const typename ColumnType::Container_t & x = dynamic_cast<const ColumnType &>(column).getData();
size_t prev_callback_point = 0;
size_t next_callback_point = 0;
size_t size = x.size();
while (next_callback_point < size)
{
next_callback_point = callback ? callback() : size;
if (next_callback_point > size)
next_callback_point = size;
ostr.write(reinterpret_cast<const char *>(&x[prev_callback_point]),
sizeof(typename ColumnType::value_type) * (next_callback_point - prev_callback_point));
if (limit == 0 || offset + limit > size)
limit = size - offset;
prev_callback_point = next_callback_point;
}
ostr.write(reinterpret_cast<const char *>(&x[offset]), sizeof(typename ColumnType::value_type) * limit);
}
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const

View File

@ -24,7 +24,6 @@ namespace DB
template <typename FieldType, typename ColumnType>
class IDataTypeNumberVariable : public IDataTypeNumber<FieldType>
{
typedef IDataType::WriteCallback WriteCallback;
public:
void serializeBinary(const Field & field, WriteBuffer & ostr) const
{
@ -36,19 +35,17 @@ public:
readVarT(static_cast<typename ColumnType::value_type &>(boost::get<FieldType &>(field)), istr);
}
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const
{
const typename ColumnType::Container_t & x = dynamic_cast<const ColumnType &>(column).getData();
size_t size = x.size();
size_t end = limit && offset + limit < size
? offset + limit
: size;
size_t next_callback_point = callback ? callback() : 0;
for (size_t i = 0; i < size; ++i)
{
if (next_callback_point && i == next_callback_point)
next_callback_point = callback();
for (size_t i = offset; i < end; ++i)
writeVarT(x[i], ostr);
}
}
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const

View File

@ -26,20 +26,19 @@ void DataTypeAggregateFunction::deserializeBinary(Field & field, ReadBuffer & is
field = value;
}
void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const
void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const ColumnAggregateFunction & real_column = dynamic_cast<const ColumnAggregateFunction &>(column);
const ColumnAggregateFunction::Container_t & vec = real_column.getData();
size_t next_callback_point = callback ? callback() : 0;
size_t i = 0;
for (ColumnAggregateFunction::Container_t::const_iterator it = vec.begin(); it != vec.end(); ++it, ++i)
{
if (next_callback_point && i == next_callback_point)
next_callback_point = callback();
ColumnAggregateFunction::Container_t::const_iterator it = vec.begin() + offset;
ColumnAggregateFunction::Container_t::const_iterator end = limit ? it + limit : vec.end();
if (end > vec.end())
end = vec.end();
for (; it != end; ++it)
(*it)->serialize(ostr);
}
}
void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const

View File

@ -32,28 +32,17 @@ void DataTypeArray::deserializeBinary(Field & field, ReadBuffer & istr) const
}
static size_t adjustedWriteCallback(IDataType::WriteCallback & original_callback, const ColumnArray::Offsets_t & offsets)
{
size_t original_res = original_callback();
if (unlikely(original_res == 0))
return 0;
else if (unlikely(original_res >= offsets.size()))
return offsets.back();
else
return offsets[original_res - 1];
}
void DataTypeArray::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const
void DataTypeArray::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const ColumnArray & column_array = dynamic_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
nested->serializeBinary(column_array.getData(), ostr,
callback
? boost::bind(adjustedWriteCallback, boost::ref(callback), boost::cref(offsets))
: WriteCallback());
size_t nested_offset = offset ? offsets[offset] : 0;
size_t nested_limit = limit && offset + limit < offsets.size()
? offsets[offset + limit] - offset
: 0;
nested->serializeBinary(column_array.getData(), ostr, nested_offset, nested_limit);
}
@ -71,7 +60,7 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_
}
void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const
void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const ColumnArray & column_array = dynamic_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
@ -80,16 +69,12 @@ void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr,
if (!size)
return;
size_t next_callback_point = callback ? callback() : 0;
writeIntBinary(offsets[0], ostr);
for (size_t i = 1; i < size; ++i)
{
if (next_callback_point && i == next_callback_point)
next_callback_point = callback();
size_t end = limit && offset + limit < size
? offset + limit
: size;
for (size_t i = offset; i < end; ++i)
writeIntBinary(offsets[i] - offsets[i - 1], ostr);
}
}

View File

@ -38,26 +38,17 @@ void DataTypeFixedString::deserializeBinary(Field & field, ReadBuffer & istr) co
}
void DataTypeFixedString::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const
void DataTypeFixedString::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const ColumnFixedArray & column_array = dynamic_cast<const ColumnFixedArray &>(column);
const ColumnUInt8::Container_t & data = dynamic_cast<const ColumnUInt8 &>(column_array.getData()).getData();
size_t prev_callback_point = 0;
size_t next_callback_point = 0;
size_t size = data.size() / n;
while (next_callback_point < size)
{
next_callback_point = callback ? callback() : size;
if (next_callback_point > size)
next_callback_point = size;
if (limit == 0 || offset + limit > size)
limit = size - offset;
ostr.write(reinterpret_cast<const char *>(&data[prev_callback_point * n]),
n * (next_callback_point - prev_callback_point));
prev_callback_point = next_callback_point;
}
ostr.write(reinterpret_cast<const char *>(&data[offset]), n * limit);
}

View File

@ -40,7 +40,7 @@ void DataTypeString::deserializeBinary(Field & field, ReadBuffer & istr) const
}
void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const
void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const ColumnArray & column_array = dynamic_cast<const ColumnArray &>(column);
const ColumnUInt8::Container_t & data = dynamic_cast<const ColumnUInt8 &>(column_array.getData()).getData();
@ -50,16 +50,12 @@ void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr,
if (!size)
return;
size_t next_callback_point = callback ? callback() : 0;
writeVarUInt(offsets[0] - 1, ostr);
ostr.write(reinterpret_cast<const char *>(&data[0]), offsets[0] - 1);
size_t end = limit && offset + limit < size
? offset + limit
: size;
for (size_t i = 1; i < size; ++i)
for (size_t i = offset; i < end; ++i)
{
if (next_callback_point && i == next_callback_point)
next_callback_point = callback();
UInt64 str_size = offsets[i] - offsets[i - 1] - 1;
writeVarUInt(str_size, ostr);
ostr.write(reinterpret_cast<const char *>(&data[offsets[i - 1]]), str_size);

View File

@ -221,6 +221,7 @@ private:
void writeData(const String & path, const String & name, const IDataType & type, const IColumn & column, size_t level = 0)
{
String escaped_column_name = escapeForFileName(name);
size_t size = column.size();
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
@ -232,9 +233,15 @@ private:
CompressedWriteBuffer compressed(plain);
size_t prev_mark = 0;
type_arr->serializeOffsets(column, compressed,
boost::bind(&MergeTreeBlockOutputStream::writeCallback, this,
boost::ref(prev_mark), boost::ref(plain), boost::ref(compressed), boost::ref(marks)));
while (prev_mark < size)
{
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
type_arr->serializeOffsets(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
}
{
@ -242,27 +249,18 @@ private:
WriteBufferFromFile marks(path + escaped_column_name + ".mrk", 4096, flags);
CompressedWriteBuffer compressed(plain);
// TODO Для массивов здесь баг - засечки сериализуются неправильно.
size_t prev_mark = 0;
type.serializeBinary(column, compressed,
boost::bind(&MergeTreeBlockOutputStream::writeCallback, this,
boost::ref(prev_mark), boost::ref(plain), boost::ref(compressed), boost::ref(marks)));
while (prev_mark < size)
{
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
type.serializeBinary(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
}
}
/// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk).
size_t writeCallback(size_t & prev_mark,
WriteBufferFromFile & plain,
CompressedWriteBuffer & compressed,
WriteBufferFromFile & marks)
{
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
prev_mark += storage.index_granularity;
return prev_mark;
}
};
@ -404,6 +402,8 @@ private:
/// Записать данные одного столбца.
void writeData(const String & name, const IDataType & type, const IColumn & column, size_t level = 0)
{
size_t size = column.size();
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{
@ -412,43 +412,53 @@ private:
ColumnStream & stream = *column_streams[size_name];
size_t prev_mark = 0;
type_arr->serializeOffsets(column, stream.compressed,
boost::bind(&MergedBlockOutputStream::writeCallback, this,
boost::ref(prev_mark), boost::ref(stream.plain), boost::ref(stream.compressed), boost::ref(stream.marks)));
while (prev_mark < size)
{
size_t limit = 0;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit);
prev_mark += limit;
}
}
{
ColumnStream & stream = *column_streams[name];
// TODO Для массивов здесь баг - засечки сериализуются неправильно.
size_t prev_mark = 0;
type.serializeBinary(column, stream.compressed,
boost::bind(&MergedBlockOutputStream::writeCallback, this,
boost::ref(prev_mark), boost::ref(stream.plain), boost::ref(stream.compressed), boost::ref(stream.marks)));
while (prev_mark < size)
{
size_t limit = 0;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type.serializeBinary(column, stream.compressed, prev_mark, limit);
prev_mark += limit;
}
}
}
/// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk).
size_t writeCallback(size_t & prev_mark,
WriteBufferFromFile & plain,
CompressedWriteBuffer & compressed,
WriteBufferFromFile & marks)
{
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0)
{
prev_mark = index_offset;
return prev_mark;
}
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
prev_mark += storage.index_granularity;
return prev_mark;
}
};
typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;