diff --git a/dbms/include/DB/ColumnTypes/ColumnTypes.h b/dbms/include/DB/ColumnTypes/ColumnTypes.h new file mode 100644 index 00000000000..646dabd0c36 --- /dev/null +++ b/dbms/include/DB/ColumnTypes/ColumnTypes.h @@ -0,0 +1,20 @@ +#ifndef DBMS_COLUMN_TYPES_COLUMNTYPES_H +#define DBMS_COLUMN_TYPES_COLUMNTYPES_H + +#include + +#include + +#include + + +namespace DB +{ + +using Poco::SharedPtr; + +typedef std::vector > ColumnTypes; + +} + +#endif diff --git a/dbms/include/DB/Core/Block.h b/dbms/include/DB/Core/Block.h index 53dc881d403..e2ac32e1365 100644 --- a/dbms/include/DB/Core/Block.h +++ b/dbms/include/DB/Core/Block.h @@ -5,7 +5,7 @@ #include #include -#include +#include namespace DB @@ -24,7 +24,7 @@ public: typedef std::map IndexByName_t; private: - Container_t columns; + Container_t data; IndexByPosition_t index_by_position; IndexByName_t index_by_name; @@ -40,8 +40,15 @@ public: ColumnWithNameAndType & getByName(const std::string & name); const ColumnWithNameAndType & getByName(const std::string & name) const; - operator bool() { return !columns.empty(); } - operator!() { return columns.empty(); } + /** Возвращает количество строк в блоке. + * Заодно проверяет, что все столбцы кроме констант (которые содержат единственное значение), + * содержат одинаковое число значений. + */ + size_t rows() const; + size_t columns() const; + + operator bool() const { return !data.empty(); } + bool operator!() const { return data.empty(); } }; } diff --git a/dbms/include/DB/Core/Column.h b/dbms/include/DB/Core/Column.h index efe9ce1bb91..0012f22ceae 100644 --- a/dbms/include/DB/Core/Column.h +++ b/dbms/include/DB/Core/Column.h @@ -14,6 +14,7 @@ namespace DB { /** Типы данных для представления столбцов значений в оперативке. + * Столбец значений может быть представлен массивом значений или единичным значением, если столбец - константа. */ typedef std::vector UInt8Column; @@ -46,6 +47,36 @@ typedef boost::make_recursive_variant< typedef std::vector TupleColumn; /// Столбец значений типа "кортеж" - несколько столбцов произвольного типа typedef std::vector ArrayColumn; /// Столбец значений типа "массив" - столбец, значения в котором - массивы + +/** Возвращает количество значений в столбце + * TODO: поправить для tuple. + */ +class ColumnVisitorSize : public boost::static_visitor +{ +public: + template size_t operator() (const T & x) const { return x.size(); } +}; + + +/** Возвращает n-ый элемент столбца. + * TODO: поправить для tuple. + */ +class ColumnVisitorNthElement : public boost::static_visitor +{ +public: + ColumnVisitorNthElement(size_t n_) : n(n_) {} + + template Field operator() (const T & x) const + { + return x.size() == 1 + ? x[0] /// столбец - константа + : x[n]; + } +private: + size_t n; +}; + + } #endif diff --git a/dbms/include/DB/Core/ColumnNames.h b/dbms/include/DB/Core/ColumnNames.h new file mode 100644 index 00000000000..378f80e0f2d --- /dev/null +++ b/dbms/include/DB/Core/ColumnNames.h @@ -0,0 +1,15 @@ +#ifndef DBMS_CORE_COLUMN_NAMES_H +#define DBMS_CORE_COLUMN_NAMES_H + +#include +#include + + +namespace DB +{ + +typedef std::vector ColumnNames; + +} + +#endif diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index f8cf7bbf528..65f865a5e1c 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -14,6 +14,11 @@ namespace ErrorCodes UNEXPECTED_END_OF_FILE, CANNOT_READ_DATA_FROM_ISTREAM, CANNOT_PARSE_TEXT, + INCORRECT_NUMBER_OF_COLUMNS, + THERE_IS_NO_COLUMN, + SIZES_OF_COLUMNS_DOESNT_MATCH, + EMPTY_COLUMN_IN_BLOCK, + NOT_FOUND_COLUMN_IN_BLOCK, }; } diff --git a/dbms/include/DB/DataStreams/IBlockInputStream.h b/dbms/include/DB/DataStreams/IBlockInputStream.h index 4c4ac4f956a..f5b3c44bfc6 100644 --- a/dbms/include/DB/DataStreams/IBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IBlockInputStream.h @@ -1,9 +1,5 @@ #ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H -#ifndef DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H - -#include - -#include +#define DBMS_DATA_STREAMS_IBLOCKINPUTSTREAM_H #include @@ -11,8 +7,6 @@ namespace DB { -using Poco::SharedPtr; - /** Интерфейс потока для чтения данных по блокам из БД. * Реляционные операции предполагается делать также реализациями этого интерфейса. */ diff --git a/dbms/include/DB/DataStreams/IBlockOutputStream.h b/dbms/include/DB/DataStreams/IBlockOutputStream.h new file mode 100644 index 00000000000..8f1d2b45910 --- /dev/null +++ b/dbms/include/DB/DataStreams/IBlockOutputStream.h @@ -0,0 +1,25 @@ +#ifndef DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H +#define DBMS_DATA_STREAMS_IBLOCKOUTPUTSTREAM_H + +#include + + +namespace DB +{ + +/** Интерфейс потока для записи данных в БД или в сеть, или в консоль и т. п. + */ +class IBlockOutputStream +{ +public: + + /** Записать блок. + */ + virtual void write(const Block & block) = 0; + + virtual ~IBlockOutputStream() {} +}; + +} + +#endif diff --git a/dbms/include/DB/DataStreams/IRowInputStream.h b/dbms/include/DB/DataStreams/IRowInputStream.h new file mode 100644 index 00000000000..5a0d98c315f --- /dev/null +++ b/dbms/include/DB/DataStreams/IRowInputStream.h @@ -0,0 +1,26 @@ +#ifndef DBMS_DATA_STREAMS_IROWINPUTSTREAM_H +#define DBMS_DATA_STREAMS_IROWINPUTSTREAM_H + +#include + + +namespace DB +{ + +/** Интерфейс потока для чтения данных по строкам. + */ +class IRowInputStream +{ +public: + + /** Прочитать следующую строку. + * Если строк больше нет - вернуть пустую строку. + */ + virtual Row read() = 0; + + virtual ~IRowInputStream() {} +}; + +} + +#endif diff --git a/dbms/include/DB/DataStreams/IRowOutputStream.h b/dbms/include/DB/DataStreams/IRowOutputStream.h new file mode 100644 index 00000000000..f1b27144e03 --- /dev/null +++ b/dbms/include/DB/DataStreams/IRowOutputStream.h @@ -0,0 +1,34 @@ +#ifndef DBMS_DATA_STREAMS_IROWOUTPUTSTREAM_H +#define DBMS_DATA_STREAMS_IROWOUTPUTSTREAM_H + +#include + + +namespace DB +{ + +/** Интерфейс потока для записи данных по строкам (например, для вывода в консоль). + */ +class IRowOutputStream +{ +public: + + /** Записать строку. + * Есть реализация по умолчанию, которая использует методы для записи одиночных значений и разделителей. + */ + virtual void write(const Row & row); + + /** Записать значение. */ + virtual void writeField(const Field & field) = 0; + + /** Записать разделитель. */ + virtual void writeFieldDelimiter() {}; + virtual void writeRowStartDelimiter() {}; + virtual void writeRowEndDelimiter() {}; + + virtual ~IRowOutputStream() {} +}; + +} + +#endif diff --git a/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h b/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h new file mode 100644 index 00000000000..cf43c16753b --- /dev/null +++ b/dbms/include/DB/DataStreams/RowInputStreamFromBlockInputStream.h @@ -0,0 +1,34 @@ +#ifndef DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H +#define DBMS_DATA_STREAMS_ROWINPUTSTREAMFROMBLOCKINPUTSTREAM_H + +#include + +#include +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +/** Преобразует поток для чтения данных по блокам в поток для чтения данных по строкам. + */ +class RowInputStreamFromBlockInputStream : public IRowInputStream +{ +public: + explicit RowInputStreamFromBlockInputStream(IBlockInputStream & block_input_); + Row read(); + +private: + IBlockInputStream & block_input; + + size_t pos; + size_t current_rows; + Block current_block; +}; + +} + +#endif diff --git a/dbms/include/DB/DataStreams/TabSeparatedRowOutputStream.h b/dbms/include/DB/DataStreams/TabSeparatedRowOutputStream.h new file mode 100644 index 00000000000..5afe53d23b4 --- /dev/null +++ b/dbms/include/DB/DataStreams/TabSeparatedRowOutputStream.h @@ -0,0 +1,37 @@ +#ifndef DBMS_DATA_STREAMS_TABSEPARATEDROWOUTPUTSTREAM_H +#define DBMS_DATA_STREAMS_TABSEPARATEDROWOUTPUTSTREAM_H + +#include + +#include + +#include +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +/** Интерфейс потока для вывода данных в формате tsv. + */ +class TabSeparatedRowOutputStream : public IRowOutputStream +{ +public: + TabSeparatedRowOutputStream(std::ostream & ostr_, SharedPtr column_types_); + + void writeField(const Field & field); + void writeFieldDelimiter(); + void writeRowEndDelimiter(); + +private: + std::ostream & ostr; + SharedPtr column_types; + size_t field_number; +}; + +} + +#endif diff --git a/dbms/include/DB/DataStreams/copyData.h b/dbms/include/DB/DataStreams/copyData.h new file mode 100644 index 00000000000..3c10a388348 --- /dev/null +++ b/dbms/include/DB/DataStreams/copyData.h @@ -0,0 +1,22 @@ +#ifndef DBMS_DATA_STREAMS_COPY_DATA_H +#define DBMS_DATA_STREAMS_COPY_DATA_H + +#include +#include +#include +#include + + +namespace DB +{ + +/** Копирует данные из InputStream в OutputStream + * (например, из БД в консоль и т. п.) + */ +void copyData(IBlockInputStream & from, IBlockOutputStream & to); +void copyData(IRowInputStream & from, IRowOutputStream & to); +void copyData(IBlockInputStream & from, IRowOutputStream & to); + +} + +#endif diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 6887d8d343d..74ccad30c53 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -1,12 +1,22 @@ #ifndef DBMS_STORAGES_ISTORAGE_H #define DBMS_STORAGES_ISTORAGE_H +//#include + #include +#include +#include +#include + +#define DEFAULT_BLOCK_SIZE 1048576 + namespace DB { +typedef char ptree; /// временная заглушка, вместо boost::property_tree::ptree +//using boost::property_tree::ptree; using Poco::SharedPtr; /** Хранилище. Отвечает за: @@ -18,27 +28,23 @@ using Poco::SharedPtr; */ class IStorage { -private: - /** Установить указатель на таблицу и кол-группу. - * - часть инициализации, которая выполняется при инициализации таблицы. - * (инициализация хранилища выполняется в два шага: - * 1 - конструктор, - * 2 - добавление к таблице (выполняется в конструкторе Table)) - */ - virtual void addToTable(Table * table_, ColumnGroup * column_group_) = 0; - public: - /** Прочитать данные, соответствующие точному значению ключа или префиксу. + /// Основное имя типа таблицы (например, StorageWithoutKey). + virtual std::string getName() const = 0; + + /** Читать набор столбцов из таблицы. + * Принимает список столбцов, которых нужно прочитать, а также описание запроса, + * из которого может быть извлечена информация о том, каким способом извлекать данные + * (индексы, блокировки и т. п.) * Возвращает объект, с помощью которого можно последовательно читать данные. */ - virtual Poco::SharedPtr read(const Row & key) = 0; - - /** Записать пачку данных в таблицу, обновляя существующие данные, если они есть. - * @param data - набор данных вида ключ (набор столбцов) -> значение (набор столбцов) - * @param mask - битовая маска - какие столбцы входят в кол-группу, - * которую хранит это хранилище - */ - virtual void merge(const AggregatedRowSet & data, const ColumnMask & mask) = 0; + virtual SharedPtr read( + const ColumnNames & column_names, + const ptree & query, + size_t max_block_size = DEFAULT_BLOCK_SIZE) + { + throw Exception("Method read() is not supported by storage " + getName()); + } virtual ~IStorage() {} }; diff --git a/dbms/include/DB/Storages/StorageSystemNumbers.h b/dbms/include/DB/Storages/StorageSystemNumbers.h new file mode 100644 index 00000000000..7c6d2b5b661 --- /dev/null +++ b/dbms/include/DB/Storages/StorageSystemNumbers.h @@ -0,0 +1,43 @@ +#ifndef DBMS_STORAGES_STORAGE_SYSTEM_NUMBERS_H +#define DBMS_STORAGES_STORAGE_SYSTEM_NUMBERS_H + +#include + +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +class NumbersBlockInputStream : public IBlockInputStream +{ +public: + NumbersBlockInputStream(size_t block_size_); + Block read(); +private: + size_t block_size; + UInt64 next; +}; + + +/** Реализует хранилище для системной таблицы Numbers. + * Таблица содержит единственный столбец number UInt64. + * Из этой таблицы можно прочитать все натуральные числа, начиная с 0 (до 2^64 - 1, а потом заново). + */ +class StorageSystemNumbers : public IStorage +{ +public: + std::string getName() const { return "SystemNumbers"; } + + SharedPtr read( + const ColumnNames & column_names, + const ptree & query, + size_t max_block_size = DEFAULT_BLOCK_SIZE); +}; + +} + +#endif diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 71ae431b68c..9ba2f1a5cbb 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -1,3 +1,6 @@ +#include +#include + #include @@ -6,16 +9,16 @@ namespace DB void Block::rebuildIndexByPosition() { - index_by_position.resize(columns.size()); + index_by_position.resize(data.size()); size_t pos = 0; - for (Container_t::iterator it = columns.begin(); it != columns.end(); ++it, ++pos) + for (Container_t::iterator it = data.begin(); it != data.end(); ++it, ++pos) index_by_position[pos] = it; } void Block::insert(size_t position, const ColumnWithNameAndType & elem) { - Container_t::iterator it = columns.insert(index_by_position[position], elem); + Container_t::iterator it = data.insert(index_by_position[position], elem); rebuildIndexByPosition(); index_by_name[elem.name] = it; } @@ -25,34 +28,63 @@ void Block::erase(size_t position) { Container_t::iterator it = index_by_position[position]; index_by_name.erase(index_by_name.find(it->name)); - columns.erase(it); + data.erase(it); rebuildIndexByPosition(); } -Block::ColumnWithNameAndType & Block::getByPosition(size_t position) +ColumnWithNameAndType & Block::getByPosition(size_t position) { return *index_by_position[position]; } -const Block::ColumnWithNameAndType & Block::getByPosition(size_t position) const +const ColumnWithNameAndType & Block::getByPosition(size_t position) const { return *index_by_position[position]; } -Block::ColumnWithNameAndType & Block::getByName(const std::string & name) +ColumnWithNameAndType & Block::getByName(const std::string & name) { return *index_by_name[name]; } -const Block::ColumnWithNameAndType & Block::getByName(const std::string & name) const +const ColumnWithNameAndType & Block::getByName(const std::string & name) const { - return *index_by_name[name]; + IndexByName_t::const_iterator it = index_by_name.find(name); + if (index_by_name.end() == it) + throw Exception("Not found column " + name + " in block.", ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); + + return *it->second; +} + + +size_t Block::rows() const +{ + size_t res = 0; + ColumnVisitorSize visitor; + for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it) + { + size_t size = boost::apply_visitor(visitor, *it->column); + + if (size == 0) + throw Exception("Empty column in block.", ErrorCodes::EMPTY_COLUMN_IN_BLOCK); + + if (size != 1 && res != 0 && size != res) + throw Exception("Sizes of columns doesn't match.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + res = size; + } + + return res; +} + + +size_t Block::columns() const +{ + return data.size(); } } - -#endif diff --git a/dbms/src/DataStreams/IRowOutputStream.cpp b/dbms/src/DataStreams/IRowOutputStream.cpp new file mode 100644 index 00000000000..cf660b0b3f2 --- /dev/null +++ b/dbms/src/DataStreams/IRowOutputStream.cpp @@ -0,0 +1,24 @@ +#include + + +namespace DB +{ + +void IRowOutputStream::write(const Row & row) +{ + writeRowStartDelimiter(); + + if (!row.empty()) + { + writeField(row[0]); + for (size_t i = 1; i < row.size(); ++i) + { + writeFieldDelimiter(); + writeField(row[i]); + } + } + + writeRowEndDelimiter(); +} + +} diff --git a/dbms/src/DataStreams/RowInputStreamFromBlockInputStream.cpp b/dbms/src/DataStreams/RowInputStreamFromBlockInputStream.cpp new file mode 100644 index 00000000000..00438213230 --- /dev/null +++ b/dbms/src/DataStreams/RowInputStreamFromBlockInputStream.cpp @@ -0,0 +1,51 @@ +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +RowInputStreamFromBlockInputStream::RowInputStreamFromBlockInputStream(IBlockInputStream & block_input_) + : block_input(block_input_), pos(0), current_rows(0) +{ +} + +class TestVisitor : public boost::static_visitor +{ +public: + template UInt64 operator() (const T & x) const + { + return 0; + } +}; + + +Row RowInputStreamFromBlockInputStream::read() +{ +/* if (pos >= current_rows) + { + current_block = block_input.read(); + current_rows = current_block.rows(); + pos = 0; + } + + ColumnVisitorNthElement visitor(pos); + size_t columns = current_block.columns(); + Row row(columns); + + for (size_t i = 0; i < columns; ++i) + row[i] = boost::apply_visitor(visitor, *current_block.getByPosition(i).column); + + return row; +*/ + + Column column = UInt64Column(0); + Field field = boost::apply_visitor(TestVisitor(), column); + + Row row; + return row; +} + +} diff --git a/dbms/src/DataStreams/TabSeparatedRowOutputStream.cpp b/dbms/src/DataStreams/TabSeparatedRowOutputStream.cpp new file mode 100644 index 00000000000..8b7c7adbad2 --- /dev/null +++ b/dbms/src/DataStreams/TabSeparatedRowOutputStream.cpp @@ -0,0 +1,35 @@ +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(std::ostream & ostr_, SharedPtr column_types_) + : ostr(ostr_), column_types(column_types_), field_number(0) +{ +} + + +void TabSeparatedRowOutputStream::writeField(const Field & field) +{ + column_types->at(field_number)->serializeTextEscaped(field, ostr); + ++field_number; +} + + +void TabSeparatedRowOutputStream::writeFieldDelimiter() +{ + ostr.put('\t'); +} + + +void TabSeparatedRowOutputStream::writeRowEndDelimiter() +{ + ostr.put('\n'); + field_number = 0; +} + +} diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp new file mode 100644 index 00000000000..6d5828af52a --- /dev/null +++ b/dbms/src/DataStreams/copyData.cpp @@ -0,0 +1,34 @@ +#include + +#include + + +namespace DB +{ + +void copyData(IBlockInputStream & from, IBlockOutputStream & to) +{ + while (Block block = from.read()) + to.write(block); +} + + +void copyData(IRowInputStream & from, IRowOutputStream & to) +{ + while (1) + { + Row row = from.read(); + if (row.empty()) + break; + to.write(row); + } +} + + +void copyData(IBlockInputStream & from, IRowOutputStream & to) +{ + RowInputStreamFromBlockInputStream row_input(from); + copyData(row_input, to); +} + +} diff --git a/dbms/src/Storages/StorageSystemNumbers.cpp b/dbms/src/Storages/StorageSystemNumbers.cpp new file mode 100644 index 00000000000..ec1d1b97aff --- /dev/null +++ b/dbms/src/Storages/StorageSystemNumbers.cpp @@ -0,0 +1,51 @@ +#include + +#include +#include +#include +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +NumbersBlockInputStream::NumbersBlockInputStream(size_t block_size_) : block_size(block_size_), next(0) +{ +} + + +Block NumbersBlockInputStream::read() +{ + Block res; + res.insert(0, ColumnWithNameAndType()); + ColumnWithNameAndType & column_with_name_and_type = res.getByPosition(0); + column_with_name_and_type.name = "number"; + column_with_name_and_type.type = new ColumnTypeUInt64(); + column_with_name_and_type.column = new Column; + *column_with_name_and_type.column = UInt64Column(block_size); + UInt64Column & vec = boost::get(*column_with_name_and_type.column); + + for (size_t i = 0; i < block_size; ++i) + vec[i] = next++; + + return res; +} + + +SharedPtr StorageSystemNumbers::read( + const ColumnNames & column_names, const ptree & query, size_t max_block_size) +{ + if (column_names.size() != 1) + throw Exception("Incorrect number of columns.", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); + + if (column_names[0] != "number") + throw Exception("There is no column " + column_names[0] + " in table System.Numbers.", + ErrorCodes::THERE_IS_NO_COLUMN); + + return new NumbersBlockInputStream(max_block_size); +} + +}