dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-08-19 18:31:14 +00:00
parent 10861629ba
commit c972f291cb
13 changed files with 103 additions and 31 deletions

View File

@ -64,6 +64,7 @@ namespace ErrorCodes
STORAGE_DOESNT_ALLOW_PARAMETERS,
UNKNOWN_STORAGE,
TABLE_ALREADY_EXISTS,
TABLE_METADATA_ALREADY_EXISTS,
};
}

View File

@ -12,6 +12,9 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/VarInt.h>
#define DEFAULT_MAX_STRING_SIZE 0x00FFFFFFULL
namespace DB
@ -65,6 +68,20 @@ inline void readFloatBinary(T & x, ReadBuffer & buf)
readBinary(x, buf);
}
inline void readStringBinary(std::string & s, DB::ReadBuffer & buf, size_t MAX_STRING_SIZE = DEFAULT_MAX_STRING_SIZE)
{
size_t size = 0;
DB::readVarUInt(size, buf);
if (size > MAX_STRING_SIZE)
throw Poco::Exception("Too large string size.");
s.resize(size);
buf.readStrict(const_cast<char *>(s.data()), size);
}
inline void readChar(char & x, ReadBuffer & buf)
{
if (!buf.eof())

View File

@ -13,6 +13,7 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/VarInt.h>
#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U
/// 20 цифр и знак
@ -52,6 +53,13 @@ inline void writeFloatBinary(T & x, WriteBuffer & buf)
}
inline void writeStringBinary(const std::string & s, DB::WriteBuffer & buf)
{
writeVarUInt(s.size(), buf);
buf.write(s.data(), s.size());
}
template <typename T>
void writeIntText(T x, WriteBuffer & buf)
{

View File

@ -3,6 +3,9 @@
#include <map>
#include <set>
#include <Poco/SharedPtr.h>
#include <Poco/Mutex.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/IStorage.h>
#include <DB/Functions/IFunction.h>
@ -12,6 +15,8 @@
namespace DB
{
using Poco::SharedPtr;
/// имя функции -> функция
typedef std::map<String, FunctionPtr> Functions;
@ -27,11 +32,15 @@ typedef std::map<String, Tables> Databases;
struct Context
{
String path; /// Путь к директории с данными, со слешем на конце.
Databases databases; /// Список БД и таблиц в них.
SharedPtr<Databases> databases; /// Список БД и таблиц в них.
String current_database; /// Текущая БД.
Functions functions; /// Обычные функции.
SharedPtr<Functions> functions; /// Обычные функции.
DataTypeFactory data_type_factory; /// Типы данных.
NamesAndTypes columns; /// Столбцы текущей обрабатываемой таблицы.
SharedPtr<Poco::FastMutex> mutex; /// Для доступа и модификации разделяемых объектов.
Context() : databases(new Databases), functions(new Functions), mutex(new Poco::FastMutex) {}
};

View File

@ -14,6 +14,7 @@ class ASTCreateQuery : public IAST
{
public:
bool attach; /// Запрос ATTACH TABLE, а не CREATE TABLE.
bool if_not_exists;
String name;
ASTPtr columns;
ASTPtr storage;

View File

@ -28,7 +28,7 @@ protected:
/** Запрос типа такого:
* CREATE TABLE name
* CREATE|ATTACH TABLE [IF NOT EXISTS] name
* (
* name1 type1,
* name2 type2,

View File

@ -49,7 +49,7 @@ public:
ASTPtr query,
size_t max_block_size = DEFAULT_BLOCK_SIZE)
{
throw Exception("Method read() is not supported by storage " + getName());
throw Exception("Method read() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Пишет данные в таблицу.
@ -59,7 +59,22 @@ public:
virtual SharedPtr<IBlockOutputStream> write(
ASTPtr query)
{
throw Exception("Method write() is not supported by storage " + getName());
throw Exception("Method write() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Удалить данные таблицы.
*/
virtual void drop()
{
throw Exception("Method drop() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров.
* (ALTER, затрагивающий изменение движка, делается внешним кодом, путём копирования данных.)
*/
virtual void alter(SharedPtr<NamesAndTypes> columns)
{
throw Exception("Method alter() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual ~IStorage() {}

View File

@ -48,9 +48,9 @@ int main(int argc, char ** argv)
DB::Context context;
context.columns["number"] = new DB::DataTypeUInt64;
context.functions["plus"] = new DB::FunctionPlus;
context.functions["multiply"] = new DB::FunctionMultiply;
context.functions["divide"] = new DB::FunctionDivideFloating;
(*context.functions)["plus"] = new DB::FunctionPlus;
(*context.functions)["multiply"] = new DB::FunctionMultiply;
(*context.functions)["divide"] = new DB::FunctionDivideFloating;
Poco::SharedPtr<DB::Expression> expression = new DB::Expression(ast, context);

View File

@ -15,8 +15,8 @@ void Expression::addSemantic(ASTPtr ast)
{
if (ASTFunction * node = dynamic_cast<ASTFunction *>(&*ast))
{
Functions::const_iterator it = context.functions.find(node->name);
if (it == context.functions.end())
Functions::const_iterator it = context.functions->find(node->name);
if (it == context.functions->end())
throw Exception("Unknown function " + node->name, ErrorCodes::UNKNOWN_FUNCTION);
node->function = it->second;

View File

@ -18,12 +18,21 @@ StoragePtr InterpreterCreateQuery::execute(ASTPtr query, Context & context)
String database_name = context.current_database;
String table_name = create.name;
SharedPtr<NamesAndTypes> columns = new NamesAndTypes;
String data_path = context.path + "data/" + database_name + "/" + table_name + "/"; /// TODO: эскейпинг
String data_path = context.path + "data/" + database_name + "/"; /// TODO: эскейпинг
String metadata_path = context.path + "metadata/" + database_name + "/" + table_name + ".sql";
if (context.databases[database_name].end() != context.databases[database_name].find(table_name))
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
{
Poco::ScopedLock<Poco::FastMutex> lock(*context.mutex);
if ((*context.databases)[database_name].end() != (*context.databases)[database_name].find(table_name))
{
if (create.if_not_exists)
return (*context.databases)[database_name][table_name];
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
ASTExpressionList & columns_list = dynamic_cast<ASTExpressionList &>(*create.columns);
for (ASTs::iterator it = columns_list.children.begin(); it != columns_list.children.end(); ++it)
{
@ -57,18 +66,21 @@ StoragePtr InterpreterCreateQuery::execute(ASTPtr query, Context & context)
else
throw Exception("Unknown storage " + storage_str, ErrorCodes::UNKNOWN_STORAGE);
if (!create.attach)
/// Проверка наличия метаданных таблицы на диске и создание метаданных
if (Poco::File(metadata_path).exists())
throw Exception("Metadata for table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS);
Poco::FileOutputStream metadata_file(metadata_path);
metadata_file << String(create.range.first, create.range.second - create.range.first) << std::endl;
{
/// Проверка наличия метаданных таблицы на диске и создание метаданных
Poco::ScopedLock<Poco::FastMutex> lock(*context.mutex);
if (Poco::File(metadata_path).exists())
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
Poco::FileOutputStream metadata_file(metadata_path);
metadata_file << String(create.range.first, create.range.second - create.range.first) << std::endl;
(*context.databases)[database_name][table_name] = res;
}
context.databases[database_name][table_name] = res;
return res;
}

View File

@ -13,7 +13,7 @@ int main(int argc, char ** argv)
{
DB::ParserCreateQuery parser;
DB::ASTPtr ast;
std::string input = "CREATE TABLE hits (\n"
std::string input = "CREATE TABLE IF NOT EXISTS hits (\n"
"WatchID UInt64,\n"
"JavaEnable UInt8,\n"
"Title String,\n"
@ -95,7 +95,7 @@ int main(int argc, char ** argv)
DB::Context context;
context.path = "./";
context.databases["test"];
(*context.databases)["test"];
context.current_database = "test";
DB::InterpreterCreateQuery interpreter;

View File

@ -100,8 +100,8 @@ int main(int argc, char ** argv)
DB::Context context;
context.columns["x"] = new DB::DataTypeInt16;
context.functions["plus"] = new DB::FunctionPlus;
context.functions["multiply"] = new DB::FunctionMultiply;
(*context.functions)["plus"] = new DB::FunctionPlus;
(*context.functions)["multiply"] = new DB::FunctionMultiply;
DB::Expression expression(ast, context);

View File

@ -78,6 +78,9 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
ParserString s_rparen(")");
ParserString s_engine("ENGINE", true);
ParserString s_eq("=");
ParserString s_if("IF", true);
ParserString s_not("NOT", true);
ParserString s_exists("EXISTS", true);
ParserIdentifier name_p;
ParserList columns_p(new ParserNameTypePair, new ParserString(","), false);
ParserIdentifierWithOptionalParameters storage_p;
@ -86,6 +89,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
ASTPtr columns;
ASTPtr storage;
bool attach = false;
bool if_not_exists = false;
ws.ignore(pos, end);
@ -97,12 +101,16 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
return false;
}
ws.ignore(pos, end);
if (!s_table.ignore(pos, end, expected))
if (!ws.ignore(pos, end) || !s_table.ignore(pos, end, expected) || !ws.ignore(pos, end))
return false;
ws.ignore(pos, end);
if (s_if.ignore(pos, end, expected)
&& ws.ignore(pos, end)
&& s_not.ignore(pos, end, expected)
&& ws.ignore(pos, end)
&& s_exists.ignore(pos, end, expected)
&& ws.ignore(pos, end))
if_not_exists = true;
if (!name_p.parse(pos, end, name, expected))
return false;
@ -141,6 +149,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
node = query;
query->attach = attach;
query->if_not_exists = if_not_exists;
query->name = dynamic_cast<ASTIdentifier &>(*name).name;
query->columns = columns;
query->storage = storage;