dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-10-31 06:37:12 +00:00
parent 69ce664c9c
commit b1b0d3836a
27 changed files with 187 additions and 101 deletions

View File

@ -26,7 +26,8 @@ struct ColumnWithNameAndType
res.name = name;
res.type = type;
res.column = column->cloneEmpty();
if (column)
res.column = column->cloneEmpty();
return res;
}

View File

@ -1,5 +1,4 @@
#ifndef DBMS_CORE_COLUMN_WITH_NAME_AND_TYPE_H
#define DBMS_CORE_COLUMN_WITH_NAME_AND_TYPE_H
#pragma once
#include <vector>
@ -12,5 +11,3 @@ namespace DB
typedef std::vector<ColumnWithNameAndType> ColumnsWithNameAndType;
}
#endif

View File

@ -1,6 +1,5 @@
#ifndef DBMS_CORE_DEFINES_H
#define DBMS_CORE_DEFINES_H
#pragma once
#define DEFAULT_BLOCK_SIZE 1048576
#endif
#define DEFAULT_MAX_QUERY_SIZE 1048576
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L

View File

@ -83,6 +83,8 @@ namespace ErrorCodes
CANNOT_WRITE_TO_FILE_DESCRIPTOR,
CANNOT_OPEN_FILE,
CANNOT_CLOSE_FILE,
UNKNOWN_TYPE_OF_QUERY,
INCORRECT_FILE_NAME,
};
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>

View File

@ -25,6 +25,11 @@ public:
*/
virtual Block read() = 0;
/** Прочитать что-нибудь перед началом всех данных или после конца всех данных.
*/
virtual void readPrefix() {}
virtual void readSuffix() {}
virtual ~IBlockInputStream() {}
/** Для вывода дерева преобразований потока данных (плана выполнения запроса).

View File

@ -21,6 +21,11 @@ public:
*/
virtual void write(const Block & block) = 0;
/** Записать что-нибудь перед началом всех данных или после конца всех данных.
*/
virtual void writePrefix() {}
virtual void writeSuffix() {}
/** Создать копию объекта.
* Предполагается, что функция вызывается только до использования объекта (сразу после создания, до вызова других методов),
* только для того, чтобы можно было преобразовать параметр, переданный по ссылке в shared ptr.

View File

@ -17,6 +17,11 @@ public:
*/
virtual Row read() = 0;
/// Прочитать разделитель
virtual void readRowBetweenDelimiter() {}; /// разделитель между строками
virtual void readPrefix() {}; /// разделитель перед началом результата
virtual void readSuffix() {}; /// разделитель после конца результата
/** Создать копию объекта.
* Предполагается, что функция вызывается только до использования объекта (сразу после создания, до вызова других методов),
* только для того, чтобы можно было преобразовать параметр, переданный по ссылке в shared ptr.

View File

@ -22,10 +22,12 @@ public:
virtual void writeField(const Field & field) = 0;
/** Записать разделитель. */
virtual void writeFieldDelimiter() {};
virtual void writeRowStartDelimiter() {};
virtual void writeRowEndDelimiter() {};
virtual void writeRowBetweenDelimiter() {};
virtual void writeFieldDelimiter() {}; /// разделитель между значениями
virtual void writeRowStartDelimiter() {}; /// разделитель перед каждой строкой
virtual void writeRowEndDelimiter() {}; /// разделитель после каждой строки
virtual void writeRowBetweenDelimiter() {}; /// разделитель между строками
virtual void writePrefix() {}; /// разделитель перед началом результата
virtual void writeSuffix() {}; /// разделитель после конца результата
/** Создать копию объекта.
* Предполагается, что функция вызывается только до использования объекта (сразу после создания, до вызова других методов),

View File

@ -16,6 +16,9 @@ public:
explicit RowInputStreamFromBlockInputStream(BlockInputStreamPtr block_input_);
Row read();
void readPrefix() { block_input->readPrefix(); };
void readSuffix() { block_input->readSuffix(); };
RowInputStreamPtr clone() { return new RowInputStreamFromBlockInputStream(block_input); }
private:

View File

@ -1,5 +1,7 @@
#pragma once
#include <vector>
#include <DB/IO/ReadBuffer.h>
@ -10,13 +12,25 @@ namespace DB
*/
class ConcatReadBuffer : public ReadBuffer
{
protected:
public:
typedef std::vector<ReadBuffer *> ReadBuffers;
protected:
ReadBuffers buffers;
ReadBuffers::iterator current;
bool nextImpl()
{
if (buffers.end() == current)
return false;
/// Первое чтение
if (working_buffer.size() == 0 && (*current)->position() != (*current)->buffer().end())
{
working_buffer = (*current)->buffer();
return true;
}
if (!(*current)->next())
{
++current;
@ -24,19 +38,17 @@ protected:
return false;
}
internal_buffer = (*current)->internal_buffer;
working_buffer = (*current)->working_buffer;
working_buffer = (*current)->buffer();
return true;
}
public:
ConcatReadBuffer(const ReadBuffers & buffers_) : buffers(buffers_), current(buffers.begin()) {}
ConcatReadBuffer(const ReadBuffers & buffers_) : ReadBuffer(NULL, 0), buffers(buffers_), current(buffers.begin()) {}
ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2)
ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ReadBuffer(NULL, 0)
{
buffers.push_bask(&buf1);
buffers.push_bask(&buf2);
buffers.push_back(&buf1);
buffers.push_back(&buf2);
current = buffers.begin();
}
};

View File

@ -28,6 +28,7 @@ public:
virtual ~WriteBufferFromFile()
{
next();
if (0 != close(fd))
throwFromErrno("Cannot close file " + file_name, ErrorCodes::CANNOT_CLOSE_FILE);
}

View File

@ -26,7 +26,7 @@ protected:
{
if (!offset())
return;
ssize_t bytes_written = ::write(fd, working_buffer.begin(), offset());
if (-1 == bytes_written || 0 == bytes_written)
throwFromErrno("Cannot write to file " + getFileName(), ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);

View File

@ -1,5 +1,4 @@
#ifndef DBMS_PARSERS_COMMONPARSERS_H
#define DBMS_PARSERS_COMMONPARSERS_H
#pragma once
#include <string.h> /// strncmp, strncasecmp
@ -56,13 +55,18 @@ protected:
*/
class ParserWhiteSpace : public IParserBase
{
public:
ParserWhiteSpace(bool allow_newlines_ = true) : allow_newlines(allow_newlines_) {}
protected:
bool allow_newlines;
String getName() { return "white space"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected)
{
Pos begin = pos;
while (*pos == ' ' || *pos == '\t' || *pos == '\n' || *pos == '\r' || *pos == '\f')
while (*pos == ' ' || *pos == '\t' || (allow_newlines && *pos == '\n') || *pos == '\r' || *pos == '\f')
++pos;
return pos != begin;
@ -143,12 +147,17 @@ protected:
class ParserWhiteSpaceOrComments : public IParserBase
{
public:
ParserWhiteSpaceOrComments(bool allow_newlines_outside_comments_ = true) : allow_newlines_outside_comments(allow_newlines_outside_comments_) {}
protected:
bool allow_newlines_outside_comments;
String getName() { return "white space or comments"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected)
{
ParserWhiteSpace p1;
ParserWhiteSpace p1(allow_newlines_outside_comments);
ParserComment p2;
bool res = false;
@ -159,5 +168,3 @@ protected:
};
}
#endif

View File

@ -28,12 +28,15 @@ protected:
/** Запрос типа такого:
* CREATE|ATTACH TABLE [IF NOT EXISTS] name
* CREATE|ATTACH TABLE [IF NOT EXISTS] [db.]name
* (
* name1 type1,
* name2 type2,
* ...
* ) ENGINE = engine
*
* Или:
* CREATE TABLE [db.]name AS [db2.]name2
*/
class ParserCreateQuery : public IParserBase
{

View File

@ -13,7 +13,8 @@ namespace DB
* INSERT INTO table (c1, c2, c3) VALUES (v11, v12, v13), (v21, v22, v23), ...
* INSERT INTO table VALUES (v11, v12, v13), (v21, v22, v23), ...
*
* Вставка данных в произвольном формате. Сами данные идут после перевода строки.
* Вставка данных в произвольном формате.
* Сами данные идут после перевода строки, если он есть, или после всех пробельных символов, иначе.
* INSERT INTO table (c1, c2, c3) FORMAT format \n ...
* INSERT INTO table FORMAT format \n ...
*

View File

@ -38,6 +38,10 @@ public:
*/
virtual const NamesAndTypes & getColumns() const = 0;
/** То же самое, но в виде блока-образца.
*/
Block getSampleBlock() const;
/** Читать набор столбцов из таблицы.
* Принимает список столбцов, которых нужно прочитать, а также описание запроса,
* из которого может быть извлечена информация о том, каким способом извлекать данные

View File

@ -23,8 +23,12 @@ Block BlockInputStreamFromRowInputStream::readImpl()
{
Block res;
row_input->readPrefix();
for (size_t rows = 0; rows < max_block_size; ++rows)
{
if (rows != 0)
row_input->readRowBetweenDelimiter();
Row row = row_input->read();
if (row.empty())
@ -39,6 +43,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
for (size_t i = 0; i < row.size(); ++i)
res.getByPosition(i).column->insert(row[i]);
}
row_input->readSuffix();
return res;
}

View File

@ -12,6 +12,7 @@ void BlockOutputStreamFromRowOutputStream::write(const Block & block)
size_t rows = block.rows();
size_t columns = block.columns();
row_output->writePrefix();
for (size_t i = 0; i < rows; ++i)
{
if (i != 0)
@ -28,6 +29,7 @@ void BlockOutputStreamFromRowOutputStream::write(const Block & block)
row_output->writeRowEndDelimiter();
}
row_output->writeSuffix();
}
}

View File

@ -36,7 +36,7 @@ void ValuesRowOutputStream::writeRowStartDelimiter()
void ValuesRowOutputStream::writeRowEndDelimiter()
{
writeString(")\n", ostr);
writeChar(')', ostr);
}

View File

@ -9,20 +9,41 @@ namespace DB
void copyData(IBlockInputStream & from, IBlockOutputStream & to)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
to.write(block);
from.readSuffix();
to.writeSuffix();
}
void copyData(IRowInputStream & from, IRowOutputStream & to)
{
from.readPrefix();
to.writePrefix();
bool first = true;
while (1)
{
if (first)
first = false;
else
{
from.readRowBetweenDelimiter();
to.writeRowBetweenDelimiter();
}
Row row = from.read();
if (row.empty())
break;
to.write(row);
}
from.readSuffix();
to.writeSuffix();
}

View File

@ -71,11 +71,19 @@ StoragePtr InterpreterCreateQuery::execute(ASTPtr query, Context & context)
/// Проверка наличия метаданных таблицы на диске и создание метаданных
if (Poco::File(metadata_path).exists())
throw Exception("Metadata for table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS);
Poco::FileOutputStream metadata_file(metadata_path);
metadata_file << String(create.range.first, create.range.second - create.range.first) << std::endl;
{
/** Запрос ATTACH TABLE может использоваться, чтобы создать в оперативке ссылку на уже существующую таблицу.
* Это используется, например, при загрузке сервера.
*/
if (!create.attach)
throw Exception("Metadata for table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS);
}
else
{
Poco::FileOutputStream metadata_file(metadata_path);
metadata_file << String(create.range.first, create.range.second - create.range.first) << std::endl;
}
{
Poco::ScopedLock<Poco::FastMutex> lock(*context.mutex);

View File

@ -7,6 +7,7 @@
#include <Poco/Stopwatch.h>
#include <Poco/NumberParser.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Storages/StorageLog.h>
@ -17,6 +18,7 @@
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Functions/FunctionsArithmetic.h>
#include <DB/Functions/FunctionsComparison.h>
@ -26,10 +28,8 @@
#include <DB/Functions/FunctionsDateTime.h>
#include <DB/Functions/FunctionsStringSearch.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Interpreters/executeQuery.h>
using Poco::SharedPtr;
@ -184,61 +184,31 @@ int main(int argc, char ** argv)
("notLike", new DB::FunctionNotLike)
;
context.path = "./";
context.aggregate_function_factory = new DB::AggregateFunctionFactory;
context.data_type_factory = new DB::DataTypeFactory;
(*context.databases)["default"]["hits"] = new DB::StorageLog("./", "hits", names_and_types_map, ".bin");
(*context.databases)["default"]["hits2"] = new DB::StorageLog("./", "hits2", names_and_types_map, ".bin");
(*context.databases)["default"]["hits3"] = new DB::StorageLog("./", "hits3", names_and_types_map, ".bin");
DB::loadMetadata(context);
(*context.databases)["default"]["hits"] = new DB::StorageLog("./data/default/", "hits", names_and_types_map, ".bin");
(*context.databases)["default"]["hits2"] = new DB::StorageLog("./data/default/", "hits2", names_and_types_map, ".bin");
(*context.databases)["default"]["hits3"] = new DB::StorageLog("./data/default/", "hits3", names_and_types_map, ".bin");
(*context.databases)["system"]["one"] = new DB::StorageSystemOne("one");
(*context.databases)["system"]["numbers"] = new DB::StorageSystemNumbers("numbers");
context.current_database = "default";
DB::ParserSelectQuery parser;
DB::ASTPtr ast;
std::string input;/* =
"SELECT "
" count(),"
" UniqID % 100,"
" UniqID % 100 * 2,"
" -1,"
" count(),"
" count() * count(),"
" sum(-OS + UserAgent + TraficSourceID + SearchEngineID) + 101,"
" SearchPhrase"
"FROM hits "
"WHERE SearchPhrase != '' "
"GROUP BY UniqID % 100, SearchPhrase "
"ORDER BY count() DESC "
"LIMIT 20";*/
std::stringstream str;
str << std::cin.rdbuf();
input = str.str();
DB::ReadBufferFromIStream in(std::cin);
DB::WriteBufferFromOStream out(std::cout);
DB::BlockInputStreamPtr query_plan;
std::string expected;
DB::executeQuery(in, out, context, query_plan);
const char * begin = input.data();
const char * end = begin + input.size();
const char * pos = begin;
bool parse_res = parser.parse(pos, end, ast, expected);
if (!parse_res || pos != end)
throw DB::Exception("Syntax error: failed at position "
+ Poco::NumberFormatter::format(pos - begin) + ": "
+ input.substr(pos - begin, 10)
+ ", expected " + (parse_res ? "end of data" : expected) + ".",
DB::ErrorCodes::SYNTAX_ERROR);
DB::formatAST(*ast, std::cerr);
std::cerr << std::endl;
/* std::cerr << ast->getTreeID() << std::endl;
*/
DB::WriteBufferFromOStream ob(std::cout);
DB::InterpreterSelectQuery interpreter(ast, context);
DB::BlockInputStreamPtr stream = interpreter.executeAndFormat(ob);
std::cerr << std::endl;
stream->dumpTree(std::cerr);
if (query_plan)
{
std::cerr << std::endl;
query_plan->dumpTree(std::cerr);
}
}
catch (const DB::Exception & e)
{

View File

@ -156,12 +156,15 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
if (!storage_p.parse(pos, end, storage, expected))
return false;
ws.ignore(pos, end);
ASTCreateQuery * query = new ASTCreateQuery(StringRange(begin, pos));
node = query;
query->attach = attach;
query->if_not_exists = if_not_exists;
query->database = dynamic_cast<ASTIdentifier &>(*database).name;
if (database)
query->database = dynamic_cast<ASTIdentifier &>(*database).name;
query->table = dynamic_cast<ASTIdentifier &>(*table).name;
query->columns = columns;
query->storage = storage;

View File

@ -79,14 +79,24 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
{
ws.ignore(pos, end);
data = pos;
pos = end;
}
else if (s_format.ignore(pos, end, expected))
{
ws.ignore(pos, end);
if (!name_p.parse(pos, end, format, expected))
return false;
ws.ignore(pos, end);
/// Данные начинаются после первого перевода строки, если такой есть, или после всех пробельных символов, иначе.
ParserWhiteSpaceOrComments ws_without_nl(false);
ws_without_nl.ignore(pos, end);
if (pos != end && *pos == '\n')
++pos;
data = pos;
pos = end;
}
else if (s_select.ignore(pos, end, expected))
{
@ -99,13 +109,18 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
expected = "VALUES or FORMAT or SELECT";
return false;
}
ASTInsertQuery * query = new ASTInsertQuery(StringRange(begin, pos));
node = query;
query->database = dynamic_cast<ASTIdentifier &>(*database).name;
if (database)
query->database = dynamic_cast<ASTIdentifier &>(*database).name;
query->table = dynamic_cast<ASTIdentifier &>(*table).name;
query->format = dynamic_cast<ASTIdentifier &>(*format).name;
if (format)
query->format = dynamic_cast<ASTIdentifier &>(*format).name;
query->columns = columns;
query->select = select;
query->data = data;

View File

@ -19,6 +19,24 @@ static std::string listOfColumns(const NamesAndTypes & available_columns)
}
Block IStorage::getSampleBlock() const
{
Block res;
const NamesAndTypes & names_and_types = getColumns();
for (NamesAndTypes::const_iterator it = names_and_types.begin(); it != names_and_types.end(); ++it)
{
ColumnWithNameAndType col;
col.name = it->first;
col.type = it->second;
col.column = col.type->createColumn();
res.insert(col);
}
return res;
}
void IStorage::check(const Names & column_names) const
{
const NamesAndTypes & available_columns = getColumns();

View File

@ -15,6 +15,8 @@ using Poco::SharedPtr;
LogBlockInputStream::LogBlockInputStream(size_t block_size_, const Names & column_names_, StorageLog & storage_)
: block_size(block_size_), column_names(column_names_), storage(storage_)
{
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
streams.insert(std::make_pair(*it, new Stream(storage.files[*it].path())));
}
@ -22,9 +24,6 @@ Block LogBlockInputStream::readImpl()
{
Block res;
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
streams.insert(std::make_pair(*it, new Stream(storage.files[*it].path())));
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
ColumnWithNameAndType column;
@ -44,18 +43,14 @@ Block LogBlockInputStream::readImpl()
LogBlockOutputStream::LogBlockOutputStream(StorageLog & storage_)
: storage(storage_)
{
for (NamesAndTypes::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
streams.insert(std::make_pair(it->first, new Stream(storage.files[it->first].path())));
}
void LogBlockOutputStream::write(const Block & block)
{
storage.check(block);
for (size_t i = 0; i < block.columns(); ++i)
{
const std::string & name = block.getByPosition(i).name;
streams.insert(std::make_pair(name, new Stream(storage.files[name].path())));
}
for (size_t i = 0; i < block.columns(); ++i)
{