ClickHouse/dbms/src/Interpreters/InterpreterCreateQuery.cpp

181 lines
6.7 KiB
C++
Raw Normal View History

2011-11-05 23:31:19 +00:00
#include <Poco/File.h>
2011-10-24 12:10:59 +00:00
#include <Poco/FileStream.h>
2011-11-05 23:31:19 +00:00
#include <DB/Common/escapeForFileName.h>
2011-11-01 17:57:37 +00:00
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
2011-11-06 06:22:52 +00:00
#include <DB/DataStreams/MaterializingBlockInputStream.h>
2011-11-01 15:16:04 +00:00
#include <DB/DataStreams/copyData.h>
2011-08-18 20:33:20 +00:00
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Storages/StorageLog.h>
#include <DB/Storages/StorageSystemNumbers.h>
2011-11-01 15:16:04 +00:00
#include <DB/Interpreters/InterpreterSelectQuery.h>
2011-08-18 20:33:20 +00:00
#include <DB/Interpreters/InterpreterCreateQuery.h>
namespace DB
{
2012-02-27 06:28:20 +00:00
InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
2011-11-01 15:16:04 +00:00
{
}
StoragePtr InterpreterCreateQuery::execute()
2011-08-18 20:33:20 +00:00
{
2011-11-05 23:31:19 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(*context.mutex);
2011-11-01 15:16:04 +00:00
ASTCreateQuery & create = dynamic_cast<ASTCreateQuery &>(*query_ptr);
2011-08-18 20:33:20 +00:00
2011-10-30 05:19:41 +00:00
String database_name = create.database.empty() ? context.current_database : create.database;
2011-11-05 23:31:19 +00:00
String database_name_escaped = escapeForFileName(database_name);
2011-10-30 05:19:41 +00:00
String table_name = create.table;
2011-11-05 23:31:19 +00:00
String table_name_escaped = escapeForFileName(table_name);
2011-10-31 17:30:44 +00:00
String as_database_name = create.as_database.empty() ? context.current_database : create.as_database;
String as_table_name = create.as_table;
2011-11-01 17:12:11 +00:00
NamesAndTypesListPtr columns = new NamesAndTypesList;
2011-11-06 02:29:13 +00:00
String data_path = context.path + "data/" + database_name_escaped + "/";
2011-11-05 23:31:19 +00:00
String metadata_path = context.path + "metadata/" + database_name_escaped + "/" + (!table_name.empty() ? table_name_escaped + ".sql" : "");
2011-08-18 20:33:20 +00:00
2011-11-05 23:31:19 +00:00
/// CREATE|ATTACH DATABASE
if (!database_name.empty() && table_name.empty())
2011-08-19 18:31:14 +00:00
{
2011-11-05 23:31:19 +00:00
if (context.databases->end() != context.databases->find(database_name))
2011-08-19 18:31:14 +00:00
{
2011-11-05 23:31:19 +00:00
if (!create.if_not_exists)
throw Exception("Database " + database_name + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
return NULL;
}
if (create.attach)
{
if (!Poco::File(data_path).exists())
throw Exception("Directory " + data_path + " doesn't exist.", ErrorCodes::DIRECTORY_DOESNT_EXIST);
}
else
{
if (!create.if_not_exists && Poco::File(metadata_path).exists())
throw Exception("Directory " + metadata_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (!create.if_not_exists && Poco::File(data_path).exists())
throw Exception("Directory " + data_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
Poco::File(metadata_path).createDirectory();
Poco::File(data_path).createDirectory();
2011-08-19 18:31:14 +00:00
}
2011-11-05 23:31:19 +00:00
(*context.databases)[database_name];
return NULL;
}
if (context.databases->end() == context.databases->find(database_name))
throw Exception("Database " + database_name + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
if ((*context.databases)[database_name].end() != (*context.databases)[database_name].find(table_name))
{
if (create.if_not_exists)
return (*context.databases)[database_name][table_name];
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
2011-08-19 18:31:14 +00:00
}
2011-08-18 20:33:20 +00:00
2011-10-31 17:30:44 +00:00
if (!create.as_table.empty()
&& ((*context.databases).end() == (*context.databases).find(as_database_name)
|| (*context.databases)[as_database_name].end() == (*context.databases)[as_database_name].find(as_table_name)))
throw Exception("Table " + as_database_name + "." + as_table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
2011-08-18 20:33:20 +00:00
2011-11-01 15:16:04 +00:00
SharedPtr<InterpreterSelectQuery> interpreter_select;
if (create.select)
2012-02-27 06:28:20 +00:00
interpreter_select = new InterpreterSelectQuery(create.select, context, max_threads, max_block_size);
2011-11-01 15:16:04 +00:00
/// Получаем список столбцов
2011-10-31 17:30:44 +00:00
if (create.columns)
2011-08-18 20:33:20 +00:00
{
2011-10-31 17:30:44 +00:00
ASTExpressionList & columns_list = dynamic_cast<ASTExpressionList &>(*create.columns);
for (ASTs::iterator it = columns_list.children.begin(); it != columns_list.children.end(); ++it)
{
ASTNameTypePair & name_and_type_pair = dynamic_cast<ASTNameTypePair &>(**it);
StringRange type_range = name_and_type_pair.type->range;
2011-11-01 17:12:11 +00:00
columns->push_back(NameAndTypePair(
name_and_type_pair.name,
context.data_type_factory->get(String(type_range.first, type_range.second - type_range.first))));
2011-10-31 17:30:44 +00:00
}
2011-08-18 20:33:20 +00:00
}
2011-10-31 17:30:44 +00:00
else if (!create.as_table.empty())
2011-11-01 17:12:11 +00:00
columns = new NamesAndTypesList((*context.databases)[as_database_name][as_table_name]->getColumnsList());
2011-11-01 15:16:04 +00:00
else if (create.select)
{
Block sample = interpreter_select->getSampleBlock();
2011-11-01 17:12:11 +00:00
columns = new NamesAndTypesList;
2011-11-01 15:16:04 +00:00
for (size_t i = 0; i < sample.columns(); ++i)
2011-11-01 17:12:11 +00:00
columns->push_back(NameAndTypePair(sample.getByPosition(i).name, sample.getByPosition(i).type));
2011-11-01 15:16:04 +00:00
}
2011-10-31 17:30:44 +00:00
else
2011-11-01 15:16:04 +00:00
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
2011-08-18 20:33:20 +00:00
2011-10-31 17:30:44 +00:00
/// Выбор нужного движка таблицы
String storage_name;
if (create.storage)
storage_name = dynamic_cast<ASTFunction &>(*create.storage).name;
else if (!create.as_table.empty())
storage_name = (*context.databases)[as_database_name][as_table_name]->getName();
2011-08-18 20:33:20 +00:00
else
2011-11-01 15:38:01 +00:00
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::INCORRECT_QUERY);
2011-10-31 17:30:44 +00:00
StoragePtr res = context.storage_factory->get(storage_name, data_path, table_name, columns);
2011-08-18 20:33:20 +00:00
2011-08-19 18:31:14 +00:00
/// Проверка наличия метаданных таблицы на диске и создание метаданных
2011-08-18 20:33:20 +00:00
2011-08-19 18:31:14 +00:00
if (Poco::File(metadata_path).exists())
2011-10-31 06:37:12 +00:00
{
/** Запрос ATTACH TABLE может использоваться, чтобы создать в оперативке ссылку на уже существующую таблицу.
* Это используется, например, при загрузке сервера.
*/
if (!create.attach)
throw Exception("Metadata for table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS);
}
else
{
Poco::FileOutputStream metadata_file(metadata_path);
2011-10-31 17:55:06 +00:00
metadata_file << "ATTACH TABLE " << table_name << "\n"
2011-10-31 17:30:44 +00:00
<< "(\n";
2011-11-01 17:12:11 +00:00
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
2011-11-01 17:57:37 +00:00
{
String quoted_column_name;
2011-11-03 20:30:12 +00:00
{
WriteBufferFromString buf(quoted_column_name);
2011-11-06 20:47:07 +00:00
writeProbablyBackQuotedString(it->first, buf);
2011-11-03 20:30:12 +00:00
}
2011-11-01 17:57:37 +00:00
metadata_file << (it != columns->begin() ? ",\n" : "") << "\t" << quoted_column_name << " " << it->second->getName();
}
2011-10-31 17:30:44 +00:00
metadata_file << "\n) ENGINE = " << storage_name << "\n";
2011-10-31 06:37:12 +00:00
}
2011-08-19 18:31:14 +00:00
2011-11-05 23:31:19 +00:00
(*context.databases)[database_name][table_name] = res;
2011-11-01 15:16:04 +00:00
/// Если запрос CREATE SELECT, то вставим в таблицу данные
if (create.select)
2011-11-06 06:22:52 +00:00
{
BlockInputStreamPtr from = new MaterializingBlockInputStream(interpreter_select->execute());
copyData(*from, *res->write(query_ptr));
}
2011-08-19 18:31:14 +00:00
2011-08-18 20:33:20 +00:00
return res;
}
}