dbms: development.

This commit is contained in:
Alexey Milovidov 2010-06-04 19:06:32 +00:00
parent 9e053a93f3
commit 75545a34e1
6 changed files with 54 additions and 51 deletions

View File

@ -26,9 +26,6 @@ public:
/// Клонировать /// Клонировать
virtual SharedPtr<IDataType> clone() const = 0; virtual SharedPtr<IDataType> clone() const = 0;
/** Предполагается, что проверка статуса stream-ов производится вызывающей стороной.
*/
/** Бинарная сериализация - для сохранения на диск / в сеть и т. п. /** Бинарная сериализация - для сохранения на диск / в сеть и т. п.
* Обратите внимание, что присутствует по два вида методов * Обратите внимание, что присутствует по два вида методов
* - для работы с единичными значениями и целыми столбцами. * - для работы с единичными значениями и целыми столбцами.

View File

@ -34,7 +34,7 @@ public:
void deserializeBinary(Field & field, ReadBuffer & istr) const void deserializeBinary(Field & field, ReadBuffer & istr) const
{ {
typename ColumnType::value_type x; typename ColumnType::value_type x;
istr.read(reinterpret_cast<char *>(&x), sizeof(x)); istr.readStrict(reinterpret_cast<char *>(&x), sizeof(x));
field = typename NearestFieldType<FieldType>::Type(x); field = typename NearestFieldType<FieldType>::Type(x);
} }

View File

@ -36,9 +36,7 @@ public:
/** Читает и разжимает следующий кусок сжатых данных. */ /** Читает и разжимает следующий кусок сжатых данных. */
void readCompressedChunk() void readCompressedChunk()
{ {
size_t size = in.read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
if (size != QUICKLZ_HEADER_SIZE)
throw Exception("Cannot read size of compressed chunk", ErrorCodes::CANNOT_READ_SIZE_OF_COMPRESSED_CHUNK);
size_t size_compressed = qlz_size_compressed(internal_buffer); size_t size_compressed = qlz_size_compressed(internal_buffer);
size_t size_decompressed = qlz_size_decompressed(internal_buffer); size_t size_decompressed = qlz_size_decompressed(internal_buffer);
@ -46,9 +44,7 @@ public:
compressed_buffer.resize(size_compressed); compressed_buffer.resize(size_compressed);
decompressed_buffer.resize(size_decompressed); decompressed_buffer.resize(size_decompressed);
size = in.read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
if (size != size_compressed - QUICKLZ_HEADER_SIZE)
throw Exception("Cannot read compressed chunk", ErrorCodes::CANNOT_READ_COMPRESSED_CHUNK);
pos_in_buffer = 0; pos_in_buffer = 0;
} }

View File

@ -82,6 +82,7 @@ public:
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied); size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to, pos, bytes_to_copy); std::memcpy(to, pos, bytes_to_copy);
pos += bytes_to_copy; pos += bytes_to_copy;
bytes_copied += bytes_to_copy;
} }
return bytes_copied; return bytes_copied;

View File

@ -33,7 +33,7 @@ void DataTypeString::deserializeBinary(Field & field, ReadBuffer & istr) const
String & s = boost::get<String>(field); String & s = boost::get<String>(field);
s.resize(size); s.resize(size);
/// непереносимо, но (действительно) быстрее /// непереносимо, но (действительно) быстрее
istr.read(const_cast<char*>(s.data()), size); istr.readStrict(const_cast<char*>(s.data()), size);
} }
@ -47,12 +47,12 @@ void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr)
if (!size) if (!size)
return; return;
writeVarUInt(offsets[0], ostr); writeVarUInt(offsets[0] - 1, ostr);
ostr.write(reinterpret_cast<const char *>(&data[0]), offsets[0]); ostr.write(reinterpret_cast<const char *>(&data[0]), offsets[0] - 1);
for (size_t i = 1; i < size; ++i) for (size_t i = 1; i < size; ++i)
{ {
UInt64 str_size = offsets[i] - offsets[i - 1]; UInt64 str_size = offsets[i] - offsets[i - 1] - 1;
writeVarUInt(str_size, ostr); writeVarUInt(str_size, ostr);
ostr.write(reinterpret_cast<const char *>(&data[offsets[i - 1]]), str_size); ostr.write(reinterpret_cast<const char *>(&data[offsets[i - 1]]), str_size);
} }
@ -83,7 +83,8 @@ void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size
if (data.size() < offset) if (data.size() < offset)
data.resize(offset); data.resize(offset);
istr.read(reinterpret_cast<char*>(&data[offset - size]), sizeof(ColumnUInt8::value_type) * size); istr.readStrict(reinterpret_cast<char*>(&data[offset - size]), sizeof(ColumnUInt8::value_type) * size);
data[offset - 1] = 0;
} }
} }

View File

@ -14,50 +14,58 @@
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
Poco::Stopwatch stopwatch; try
size_t n = 10000000;
const char * s = "Hello, world!";
size_t size = strlen(s) + 1;
DB::DataTypeString data_type;
{ {
Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString(); Poco::Stopwatch stopwatch;
DB::ColumnUInt8::Container_t & data = dynamic_cast<DB::ColumnUInt8 &>(column->getData()).getData(); size_t n = 10000000;
DB::ColumnArray::Offsets_t & offsets = column->getOffsets(); const char * s = "abcdefghi";
size_t size = strlen(s) + 1;
data.resize(n * size); DB::DataTypeString data_type;
offsets.resize(n);
for (size_t i = 0; i < n; ++i)
{ {
memcpy(&data[i * size], s, size); Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString();
offsets[i] = (i + 1) * size; DB::ColumnUInt8::Container_t & data = dynamic_cast<DB::ColumnUInt8 &>(column->getData()).getData();
DB::ColumnArray::Offsets_t & offsets = column->getOffsets();
data.resize(n * size);
offsets.resize(n);
for (size_t i = 0; i < n; ++i)
{
memcpy(&data[i * size], s, size);
offsets[i] = (i + 1) * size;
}
std::ofstream ostr("test");
DB::WriteBufferFromOStream out_buf(ostr);
stopwatch.restart();
data_type.serializeBinary(*column, out_buf);
stopwatch.stop();
std::cout << "Writing, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
} }
std::ofstream ostr("test"); {
DB::WriteBufferFromOStream out_buf(ostr); Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString();
stopwatch.restart(); std::ifstream istr("test");
data_type.serializeBinary(*column, out_buf); DB::ReadBufferFromIStream in_buf(istr);
stopwatch.stop();
std::cout << "Writing, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl; stopwatch.restart();
data_type.deserializeBinary(*column, in_buf, n);
stopwatch.stop();
std::cout << "Reading, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
std::cout << std::endl
<< boost::get<DB::String>((*column)[0]) << std::endl
<< boost::get<DB::String>((*column)[n - 1]) << std::endl;
}
} }
catch (const DB::Exception & e)
{ {
Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString(); std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
std::ifstream istr("test");
DB::ReadBufferFromIStream in_buf(istr);
stopwatch.restart();
data_type.deserializeBinary(*column, in_buf, n);
stopwatch.stop();
std::cout << "Reading, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
std::cout << std::endl
<< boost::get<DB::String>((*column)[0]) << std::endl
<< boost::get<DB::String>((*column)[n - 1]) << std::endl;
} }
return 0; return 0;