dbms: development.

This commit is contained in:
Alexey Milovidov 2010-05-13 16:13:38 +00:00
parent 8c8fe0439f
commit 6609ba4c1b
13 changed files with 271 additions and 29 deletions

View File

@ -22,6 +22,7 @@ public:
size_t size() const { return s; }
Field operator[](size_t n) const { return data; }
void cut(size_t start, size_t length) { s = length; }
void clear() { s = 0; }
/** Более эффективные методы манипуляции */
T & getData() { return data; }
@ -31,6 +32,7 @@ public:
virtual SharedPtr<IColumn> convertToFullColumn() const = 0;
private:
size_t s;
T data;
};

View File

@ -1,16 +1,34 @@
#ifndef DBMS_CORE_COLUMN_STRING_H
#define DBMS_CORE_COLUMN_STRING_H
#include <DB/Core/Types.h>
#include <DB/Columns/ColumnVector.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnsNumber.h>
namespace DB
{
/** Столбец строк. */
/** Cтолбeц значений типа "строка".
* Отличается от массива UInt8 только получением элемента (в виде String, а не Array)
*/
class ColumnString : public ColumnArray
{
public:
/** Создать пустой столбец строк, с типом значений */
ColumnString()
: ColumnArray(new ColumnUInt8())
{
}
Field operator[](size_t n) const
{
size_t offset = n == 0 ? 0 : offsets[n - 1];
size_t size = offsets[n] - offset;
const char * s = reinterpret_cast<const char *>(&dynamic_cast<const ColumnUInt8 &>(*data).getData()[offset]);
return String(s, size);
}
};
typedef ColumnVector<String> ColumnString;
}

View File

@ -17,7 +17,7 @@ using Poco::SharedPtr;
/** Столбец со значениями-кортежами.
*/
class ColumnTuple
class ColumnTuple : public IColumn
{
private:
typedef std::vector<SharedPtr<IColumn> > Container_t;
@ -62,6 +62,11 @@ public:
data[i]->cut(start, length);
}
void clear()
{
data.clear();
}
/// манипуляция с Tuple
void insertColumn(size_t pos, SharedPtr<IColumn> & column)

View File

@ -50,6 +50,11 @@ public:
}
}
void clear()
{
data.clear();
}
/** Более эффективные методы манипуляции */
Container_t & getData()
{

View File

@ -20,6 +20,9 @@ public:
/** Удалить всё кроме диапазона элементов */
virtual void cut(size_t start, size_t length) = 0;
/** Очистить */
virtual void clear() = 0;
virtual ~IColumn() {}
};

View File

@ -4,14 +4,4 @@
#include <strconvert/escape_manip.h>
#include <strconvert/unescape_manip.h>
namespace DB
{
typedef strconvert::escape_file escape;
typedef strconvert::unescape_file unescape;
}
#endif

View File

@ -4,14 +4,4 @@
#include <strconvert/escape_manip.h>
#include <strconvert/unescape_manip.h>
namespace DB
{
typedef strconvert::quote_fast quote;
typedef strconvert::unquote_fast unquote;
}
#endif

View File

@ -0,0 +1,44 @@
#ifndef DBMS_DATA_TYPES_DATATYPE_STRING_H
#define DBMS_DATA_TYPES_DATATYPE_STRING_H
#include <ostream>
#include <Poco/SharedPtr.h>
#include <DB/DataTypes/IDataType.h>
namespace DB
{
using Poco::SharedPtr;
class DataTypeString : public IDataType
{
public:
std::string getName() const
{
return "String";
}
void serializeBinary(const Field & field, std::ostream & ostr) const;
void deserializeBinary(Field & field, std::istream & istr) const;
void serializeBinary(const IColumn & column, std::ostream & ostr) const;
void deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const;
void serializeText(const Field & field, std::ostream & ostr) const;
void deserializeText(Field & field, std::istream & istr) const;
void serializeTextEscaped(const Field & field, std::ostream & ostr) const;
void deserializeTextEscaped(Field & field, std::istream & istr) const;
void serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible = false) const;
void deserializeTextQuoted(Field & field, std::istream & istr, bool compatible = false) const;
SharedPtr<IColumn> createColumn() const;
};
}
#endif

View File

@ -43,7 +43,6 @@ public:
virtual void deserializeText(Field & field, std::istream & istr) const = 0;
/** Текстовая сериализация с эскейпингом, но без квотирования.
* Есть реализация по умолчанию, которая подходит почти для всех случаев.
*/
virtual void serializeTextEscaped(const Field & field, std::ostream & ostr) const = 0;
virtual void deserializeTextEscaped(Field & field, std::istream & istr) const = 0;

View File

@ -2,6 +2,7 @@
#include <DB/Common/CompressedOutputStream.h>
#include <iostream>
namespace DB
{
@ -40,6 +41,11 @@ int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize len
length,
&scratch[0]);
std::cerr << "length: " << length << ", compressed_size: " << compressed_size << std::endl;
std::cerr.write(&buffer[0], 100);
std::cerr << std::endl;
std::cerr.write(&compressed_buffer[0], 100);
p_ostr->write(&compressed_buffer[0], compressed_size);
return static_cast<int>(length);
}

View File

@ -0,0 +1,137 @@
#include <Poco/SharedPtr.h>
#include <DB/Common/VarInt.h>
#include <DB/Common/QuoteManipulators.h>
#include <DB/Common/EscapeManipulators.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypeString.h>
namespace DB
{
using Poco::SharedPtr;
void DataTypeString::serializeBinary(const Field & field, std::ostream & ostr) const
{
const String & s = boost::get<String>(field);
writeVarUInt(s.size(), ostr);
ostr << s;
}
void DataTypeString::deserializeBinary(Field & field, std::istream & istr) const
{
UInt64 size;
readVarUInt(size, istr);
if (!istr.good())
return;
field = String("");
String & s = boost::get<String>(field);
s.resize(size);
/// непереносимо, но (действительно) быстрее
istr.read(const_cast<char*>(s.data()), size);
}
void DataTypeString::serializeBinary(const IColumn & column, std::ostream & ostr) const
{
const ColumnArray & column_array = dynamic_cast<const ColumnArray &>(column);
const ColumnUInt8::Container_t & data = dynamic_cast<const ColumnUInt8 &>(column_array.getData()).getData();
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
size_t size = column_array.size();
if (!size)
return;
writeVarUInt(offsets[0], ostr);
ostr.write(reinterpret_cast<const char *>(&data[0]), offsets[0]);
for (size_t i = 1; i < size; ++i)
{
UInt64 str_size = offsets[i] - offsets[i - 1];
writeVarUInt(str_size, ostr);
ostr.write(reinterpret_cast<const char *>(&data[offsets[i - 1]]), str_size);
}
}
void DataTypeString::deserializeBinary(IColumn & column, std::istream & istr, size_t limit) const
{
ColumnArray & column_array = dynamic_cast<ColumnArray &>(column);
ColumnUInt8::Container_t & data = dynamic_cast<ColumnUInt8 &>(column_array.getData()).getData();
ColumnArray::Offsets_t & offsets = column_array.getOffsets();
data.reserve(limit);
offsets.reserve(limit);
size_t offset = 0;
for (size_t i = 0; i < limit; ++i)
{
UInt64 size;
readVarUInt(size, istr);
if (!istr.good())
break;
offset += size;
offsets.push_back(offset);
if (data.size() < offset)
data.resize(offset);
istr.read(reinterpret_cast<char*>(&data[offset - size]), sizeof(ColumnUInt8::value_type) * size);
if (!istr.good())
throw Exception("Cannot read all data from stream", ErrorCodes::CANNOT_READ_DATA_FROM_ISTREAM);
}
}
void DataTypeString::serializeText(const Field & field, std::ostream & ostr) const
{
ostr << boost::get<const String &>(field);
}
void DataTypeString::deserializeText(Field & field, std::istream & istr) const
{
istr >> boost::get<String &>(field);
}
void DataTypeString::serializeTextEscaped(const Field & field, std::ostream & ostr) const
{
ostr << strconvert::escape_file << boost::get<const String &>(field);
}
void DataTypeString::deserializeTextEscaped(Field & field, std::istream & istr) const
{
istr >> strconvert::unescape_file >> boost::get<String &>(field);
}
void DataTypeString::serializeTextQuoted(const Field & field, std::ostream & ostr, bool compatible) const
{
ostr << strconvert::quote_fast << boost::get<const String &>(field);
}
void DataTypeString::deserializeTextQuoted(Field & field, std::istream & istr, bool compatible) const
{
istr >> strconvert::unquote_fast >> boost::get<String &>(field);
}
SharedPtr<IColumn> DataTypeString::createColumn() const
{
return new ColumnString;
}
}

View File

@ -0,0 +1,43 @@
#include <string>
#include <iostream>
#include <fstream>
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
int main(int argc, char ** argv)
{
Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString();
DB::ColumnUInt8::Container_t & data = dynamic_cast<DB::ColumnUInt8 &>(column->getData()).getData();
DB::ColumnArray::Offsets_t & offsets = column->getOffsets();
DB::DataTypeString data_type;
Poco::Stopwatch stopwatch;
size_t n = 10000000;
const char * s = "Hello, world!";
size_t size = strlen(s) + 1;
data.resize(n * size);
offsets.resize(n);
for (size_t i = 0; i < n; ++i)
{
memcpy(&data[i * size], s, size);
offsets[i] = (i + 1) * size;
}
std::ofstream ostr("/dev/null");
stopwatch.restart();
data_type.serializeBinary(*column, ostr);
stopwatch.stop();
std::cout << "Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
return 0;
}

View File

@ -16,7 +16,7 @@ int main(int argc, char ** argv)
{
try
{
const size_t rows = 1000000;
const size_t rows = 10000000;
/// создаём таблицу с парой столбцов
@ -38,7 +38,7 @@ int main(int argc, char ** argv)
vec1.resize(rows);
for (size_t i = 0; i < rows; ++i)
vec1[i] = i;
vec1[i] = 'z';
block.insert(column1);
@ -50,7 +50,7 @@ int main(int argc, char ** argv)
vec2.resize(rows);
for (size_t i = 0; i < rows; ++i)
vec2[i] = i;
vec2[i] = 'x';
block.insert(column2);