dbms: improvement [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-06-18 05:11:05 +03:00
parent e27323dd2e
commit 0719320758
36 changed files with 376 additions and 568 deletions

View File

@ -41,7 +41,7 @@ public:
{ {
BlockInputStreamPtr from = new OneBlockInputStream(block); BlockInputStreamPtr from = new OneBlockInputStream(block);
InterpreterSelectQuery select(queries[i], context, QueryProcessingStage::Complete, 0, from); InterpreterSelectQuery select(queries[i], context, QueryProcessingStage::Complete, 0, from);
BlockInputStreamPtr data = new MaterializingBlockInputStream(select.execute()); BlockInputStreamPtr data = new MaterializingBlockInputStream(select.execute().in);
copyData(*data, *children[i]); copyData(*data, *children[i]);
} }

View File

@ -0,0 +1,23 @@
#pragma once
#include <DB/DataStreams/BlockIO.h>
namespace DB
{
/** Интерфейс интерпретаторов разных запросов.
*/
class IInterpreter
{
public:
/** Для запросов, возвращающих результат (SELECT и похожие), устанавливает в BlockIO поток, из которого можно будет читать этот результат.
* Для запросов, принимающих данные (INSERT), устанавливает в BlockIO поток, куда можно писать данные.
* Для запросов, которые не требуют данные и ничего не возвращают, BlockIO будет пустым.
*/
virtual BlockIO execute() = 0;
virtual ~IInterpreter() {}
};
}

View File

@ -3,6 +3,7 @@
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Storages/AlterCommands.h> #include <DB/Storages/AlterCommands.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
namespace DB namespace DB
@ -13,12 +14,12 @@ namespace DB
/** Позволяет добавить или удалить столбец в таблице. /** Позволяет добавить или удалить столбец в таблице.
* Также позволяет осуществить манипуляции с партициями таблиц семейства MergeTree. * Также позволяет осуществить манипуляции с партициями таблиц семейства MergeTree.
*/ */
class InterpreterAlterQuery class InterpreterAlterQuery : public IInterpreter
{ {
public: public:
InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_); InterpreterAlterQuery(ASTPtr query_ptr_, Context & context_);
void execute(); BlockIO execute() override;
/** Изменяет список столбцов в метаданных таблицы на диске. Нужно вызывать под TableStructureLock соответствующей таблицы. /** Изменяет список столбцов в метаданных таблицы на диске. Нужно вызывать под TableStructureLock соответствующей таблицы.
*/ */

View File

@ -1,22 +1,22 @@
#pragma once #pragma once
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
namespace DB namespace DB
{ {
class InterpreterCheckQuery class InterpreterCheckQuery : public IInterpreter
{ {
public: public:
InterpreterCheckQuery(ASTPtr query_ptr_, Context & context_); InterpreterCheckQuery(ASTPtr query_ptr_, Context & context_);
BlockInputStreamPtr execute(); BlockIO execute() override;
DB::Block getSampleBlock();
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context context; Context context;
DB::Block result; Block result;
}; };
} }

View File

@ -2,6 +2,7 @@
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Storages/ColumnDefault.h> #include <DB/Storages/ColumnDefault.h>
@ -11,7 +12,7 @@ namespace DB
/** Позволяет создать новую таблицу, или создать объект уже существующей таблицы, или создать БД, или создать объект уже существующей БД /** Позволяет создать новую таблицу, или создать объект уже существующей таблицы, или создать БД, или создать объект уже существующей БД
*/ */
class InterpreterCreateQuery class InterpreterCreateQuery : public IInterpreter
{ {
public: public:
InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_); InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_);
@ -21,7 +22,19 @@ public:
* assume_metadata_exists - не проверять наличие файла с метаданными и не создавать его * assume_metadata_exists - не проверять наличие файла с метаданными и не создавать его
* (для случая выполнения запроса из существующего файла с метаданными). * (для случая выполнения запроса из существующего файла с метаданными).
*/ */
StoragePtr execute(bool assume_metadata_exists = false); BlockIO execute() override
{
executeImpl(false);
return {};
}
/** assume_metadata_exists - не проверять наличие файла с метаданными и не создавать его
* (для случая выполнения запроса из существующего файла с метаданными).
*/
void executeLoadExisting()
{
executeImpl(true);
}
/// Список столбцов с типами в AST. /// Список столбцов с типами в AST.
static ASTPtr formatColumns(const NamesAndTypesList & columns); static ASTPtr formatColumns(const NamesAndTypesList & columns);
@ -32,6 +45,8 @@ public:
const ColumnDefaults & column_defaults); const ColumnDefaults & column_defaults);
private: private:
void executeImpl(bool assume_metadata_exists);
/// AST в список столбцов с типами. Столбцы типа Nested развернуты в список настоящих столбцов. /// AST в список столбцов с типами. Столбцы типа Nested развернуты в список настоящих столбцов.
using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>; using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>;
ColumnsAndDefaults parseColumns(ASTPtr expression_list); ColumnsAndDefaults parseColumns(ASTPtr expression_list);

View File

@ -4,6 +4,7 @@
#include <DB/Parsers/TablePropertiesQueriesASTs.h> #include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/DataStreams/OneBlockInputStream.h> #include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/BlockIO.h> #include <DB/DataStreams/BlockIO.h>
#include <DB/DataStreams/copyData.h> #include <DB/DataStreams/copyData.h>
@ -20,13 +21,13 @@ namespace DB
/** Вернуть названия и типы столбцов указанной таблицы. /** Вернуть названия и типы столбцов указанной таблицы.
*/ */
class InterpreterDescribeQuery class InterpreterDescribeQuery : public IInterpreter
{ {
public: public:
InterpreterDescribeQuery(ASTPtr query_ptr_, Context & context_) InterpreterDescribeQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {} : query_ptr(query_ptr_), context(context_) {}
BlockIO execute() BlockIO execute() override
{ {
BlockIO res; BlockIO res;
res.in = executeImpl(); res.in = executeImpl();
@ -35,20 +36,6 @@ public:
return res; return res;
} }
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf)
{
Block sample = getSampleBlock();
ASTPtr format_ast = typeid_cast<ASTDescribeQuery &>(*query_ptr).format;
String format_name = format_ast ? typeid_cast<ASTIdentifier &>(*format_ast).name : context.getDefaultFormat();
BlockInputStreamPtr in = executeImpl();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
copyData(*in, *out);
return in;
}
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context context; Context context;

View File

@ -2,6 +2,7 @@
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -10,13 +11,13 @@ namespace DB
/** Позволяет удалить таблицу вместе со всеми данными (DROP), или удалить информацию о таблице из сервера (DETACH). /** Позволяет удалить таблицу вместе со всеми данными (DROP), или удалить информацию о таблице из сервера (DETACH).
*/ */
class InterpreterDropQuery class InterpreterDropQuery : public IInterpreter
{ {
public: public:
InterpreterDropQuery(ASTPtr query_ptr_, Context & context_); InterpreterDropQuery(ASTPtr query_ptr_, Context & context_);
/// Удаляет таблицу. /// Удаляет таблицу.
void execute(); BlockIO execute() override;
/// Удаляет таблицу, уже отцепленную от контекста (Context::detach). /// Удаляет таблицу, уже отцепленную от контекста (Context::detach).
static void dropDetachedTable(String database_name, StoragePtr table, Context & context); static void dropDetachedTable(String database_name, StoragePtr table, Context & context);

View File

@ -4,6 +4,7 @@
#include <DB/Parsers/TablePropertiesQueriesASTs.h> #include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/DataStreams/OneBlockInputStream.h> #include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/BlockIO.h> #include <DB/DataStreams/BlockIO.h>
#include <DB/DataStreams/FormatFactory.h> #include <DB/DataStreams/FormatFactory.h>
@ -17,13 +18,13 @@ namespace DB
/** Проверить, существует ли таблица. Вернуть одну строку с одним столбцом result типа UInt8 со значением 0 или 1. /** Проверить, существует ли таблица. Вернуть одну строку с одним столбцом result типа UInt8 со значением 0 или 1.
*/ */
class InterpreterExistsQuery class InterpreterExistsQuery : public IInterpreter
{ {
public: public:
InterpreterExistsQuery(ASTPtr query_ptr_, Context & context_) InterpreterExistsQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {} : query_ptr(query_ptr_), context(context_) {}
BlockIO execute() BlockIO execute() override
{ {
BlockIO res; BlockIO res;
res.in = executeImpl(); res.in = executeImpl();
@ -32,20 +33,6 @@ public:
return res; return res;
} }
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf)
{
Block sample = getSampleBlock();
ASTPtr format_ast = typeid_cast<ASTExistsQuery &>(*query_ptr).format;
String format_name = format_ast ? typeid_cast<ASTIdentifier &>(*format_ast).name : context.getDefaultFormat();
BlockInputStreamPtr in = executeImpl();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
copyData(*in, *out);
return in;
}
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context context; Context context;

View File

@ -0,0 +1,19 @@
#pragma once
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB
{
class InterpreterFactory
{
public:
static SharedPtr<IInterpreter> get(
ASTPtr & query,
Context & context,
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete);
};
}

View File

@ -3,6 +3,7 @@
#include <DB/DataStreams/IBlockOutputStream.h> #include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/BlockIO.h> #include <DB/DataStreams/BlockIO.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -11,23 +12,17 @@ namespace DB
/** Интерпретирует запрос INSERT. /** Интерпретирует запрос INSERT.
*/ */
class InterpreterInsertQuery class InterpreterInsertQuery : public IInterpreter
{ {
public: public:
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_); InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_);
/** Выполнить запрос.
* remaining_data_istr, если не nullptr, может содержать нераспарсенные данные для вставки.
* (заранее может быть считан в оперативку для парсинга лишь небольшой кусок запроса, который содержит не все данные)
*/
void execute(ReadBuffer * remaining_data_istr);
/** Подготовить запрос к выполнению. Вернуть потоки блоков /** Подготовить запрос к выполнению. Вернуть потоки блоков
* - поток, в который можно писать данные для выполнения запроса, если INSERT; * - поток, в который можно писать данные для выполнения запроса, если INSERT;
* - поток, из которого можно читать результат выполнения запроса, если SELECT и подобные; * - поток, из которого можно читать результат выполнения запроса, если SELECT и подобные;
* Или ничего, если запрос INSERT SELECT (самодостаточный запрос - не принимает входные данные, не отдаёт результат). * Или ничего, если запрос INSERT SELECT (самодостаточный запрос - не принимает входные данные, не отдаёт результат).
*/ */
BlockIO execute(); BlockIO execute() override;
private: private:
StoragePtr getTable(); StoragePtr getTable();

View File

@ -3,6 +3,7 @@
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Parsers/ASTOptimizeQuery.h> #include <DB/Parsers/ASTOptimizeQuery.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -11,7 +12,7 @@ namespace DB
/** Просто вызвать метод optimize у таблицы. /** Просто вызвать метод optimize у таблицы.
*/ */
class InterpreterOptimizeQuery class InterpreterOptimizeQuery : public IInterpreter
{ {
public: public:
InterpreterOptimizeQuery(ASTPtr query_ptr_, Context & context_) InterpreterOptimizeQuery(ASTPtr query_ptr_, Context & context_)
@ -19,12 +20,13 @@ public:
{ {
} }
void execute() BlockIO execute() override
{ {
const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr); const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr);
StoragePtr table = context.getTable(ast.database, ast.table); StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true); auto table_lock = table->lockStructure(true);
table->optimize(context.getSettings()); table->optimize(context.getSettings());
return {};
} }
private: private:

View File

@ -1,50 +0,0 @@
#pragma once
#include <DB/Core/QueryProcessingStage.h>
#include <DB/DataStreams/BlockIO.h>
#include <DB/Interpreters/Context.h>
namespace DB
{
/** Интерпретирует произвольный запрос.
*/
class InterpreterQuery
{
public:
InterpreterQuery(ASTPtr query_ptr_, Context & context_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/** Выполнить запрос.
*
* ostr - куда писать результат выполнения запроса, если он есть.
*
* remaining_data_istr, если не nullptr, может содержать нераспарсенный остаток запроса с данными.
* (заранее может быть считан в оперативку для парсинга лишь небольшой кусок запроса, который содержит не все данные)
*
* В query_plan,
* после выполнения запроса, может быть записан BlockInputStreamPtr,
* использовавшийся при выполнении запроса,
* чтобы можно было получить информацию о том, как выполнялся запрос.
*/
void execute(WriteBuffer & ostr, ReadBuffer * remaining_data_istr, BlockInputStreamPtr & query_plan);
/** Подготовить запрос к выполнению. Вернуть потоки блоков, используя которые можно выполнить запрос.
*/
BlockIO execute();
private:
ASTPtr query_ptr;
Context context;
QueryProcessingStage::Enum stage;
void throwIfReadOnly()
{
if (context.getSettingsRef().limits.readonly)
throw Exception("Cannot execute query in readonly mode", ErrorCodes::READONLY);
}
};
}

View File

@ -2,6 +2,7 @@
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -10,11 +11,11 @@ namespace DB
/** Переименовать одну или несколько таблиц. /** Переименовать одну или несколько таблиц.
*/ */
class InterpreterRenameQuery class InterpreterRenameQuery : public IInterpreter
{ {
public: public:
InterpreterRenameQuery(ASTPtr query_ptr_, Context & context_); InterpreterRenameQuery(ASTPtr query_ptr_, Context & context_);
void execute(); BlockIO execute() override;
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;

View File

@ -2,6 +2,7 @@
#include <DB/Core/QueryProcessingStage.h> #include <DB/Core/QueryProcessingStage.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Interpreters/ExpressionActions.h> #include <DB/Interpreters/ExpressionActions.h>
#include <DB/DataStreams/IBlockInputStream.h> #include <DB/DataStreams/IBlockInputStream.h>
@ -15,7 +16,7 @@ class SubqueryForSet;
/** Интерпретирует запрос SELECT. Возвращает поток блоков с результатами выполнения запроса до стадии to_stage. /** Интерпретирует запрос SELECT. Возвращает поток блоков с результатами выполнения запроса до стадии to_stage.
*/ */
class InterpreterSelectQuery class InterpreterSelectQuery : public IInterpreter
{ {
public: public:
/** to_stage /** to_stage
@ -66,17 +67,12 @@ public:
/** Выполнить запрос, возможно являющиийся цепочкой UNION ALL. /** Выполнить запрос, возможно являющиийся цепочкой UNION ALL.
* Получить поток блоков для чтения * Получить поток блоков для чтения
*/ */
BlockInputStreamPtr execute(); BlockIO execute() override;
/** Выполнить запрос без объединения потоков, если это возможно. /** Выполнить запрос без объединения потоков, если это возможно.
*/ */
const BlockInputStreams & executeWithoutUnion(); const BlockInputStreams & executeWithoutUnion();
/** Выполнить запрос, записать результат в нужном формате в buf.
* BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса.
*/
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf);
DataTypes getReturnTypes(); DataTypes getReturnTypes();
Block getSampleBlock(); Block getSampleBlock();

View File

@ -2,6 +2,7 @@
#include <DB/Parsers/ASTSetQuery.h> #include <DB/Parsers/ASTSetQuery.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -10,20 +11,20 @@ namespace DB
/** Установить один или несколько параметров, для сессии или глобально... или для текущего запроса. /** Установить один или несколько параметров, для сессии или глобально... или для текущего запроса.
*/ */
class InterpreterSetQuery class InterpreterSetQuery : public IInterpreter
{ {
public: public:
InterpreterSetQuery(ASTPtr query_ptr_, Context & context_) InterpreterSetQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {} : query_ptr(query_ptr_), context(context_) {}
/** Обычный запрос SET. Задать настройку на сессию или глобальную (если указано GLOBAL). /** Обычный запрос SET. Задать настройку на сессию или глобальную (если указано GLOBAL).
*/ */
void execute() BlockIO execute() override
{ {
ASTSetQuery & ast = typeid_cast<ASTSetQuery &>(*query_ptr); ASTSetQuery & ast = typeid_cast<ASTSetQuery &>(*query_ptr);
Context & target = ast.global ? context.getGlobalContext() : context.getSessionContext(); Context & target = ast.global ? context.getGlobalContext() : context.getSessionContext();
executeImpl(ast, target); executeImpl(ast, target);
return {};
} }
/** Задать настроку для текущего контекста (контекста запроса). /** Задать настроку для текущего контекста (контекста запроса).

View File

@ -5,6 +5,7 @@
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/formatAST.h> #include <DB/Parsers/formatAST.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/DataStreams/OneBlockInputStream.h> #include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/BlockIO.h> #include <DB/DataStreams/BlockIO.h>
#include <DB/DataStreams/copyData.h> #include <DB/DataStreams/copyData.h>
@ -18,13 +19,13 @@ namespace DB
/** Вернуть одну строку с одним столбцом statement типа String с текстом запроса, создающего указанную таблицу. /** Вернуть одну строку с одним столбцом statement типа String с текстом запроса, создающего указанную таблицу.
*/ */
class InterpreterShowCreateQuery class InterpreterShowCreateQuery : public IInterpreter
{ {
public: public:
InterpreterShowCreateQuery(ASTPtr query_ptr_, Context & context_) InterpreterShowCreateQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {} : query_ptr(query_ptr_), context(context_) {}
BlockIO execute() BlockIO execute() override
{ {
BlockIO res; BlockIO res;
res.in = executeImpl(); res.in = executeImpl();
@ -33,20 +34,6 @@ public:
return res; return res;
} }
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf)
{
Block sample = getSampleBlock();
ASTPtr format_ast = typeid_cast<ASTShowCreateQuery &>(*query_ptr).format;
String format_name = format_ast ? typeid_cast<ASTIdentifier &>(*format_ast).name : context.getDefaultFormat();
BlockInputStreamPtr in = executeImpl();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
copyData(*in, *out);
return in;
}
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context context; Context context;

View File

@ -3,6 +3,7 @@
#include <DB/IO/ReadBufferFromString.h> #include <DB/IO/ReadBufferFromString.h>
#include <DB/Interpreters/executeQuery.h> #include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/IInterpreter.h>
#include <DB/Parsers/ASTQueryWithOutput.h> #include <DB/Parsers/ASTQueryWithOutput.h>
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
@ -14,26 +15,17 @@ namespace DB
/** Вернуть список запросов, исполняющихся прямо сейчас. /** Вернуть список запросов, исполняющихся прямо сейчас.
*/ */
class InterpreterShowProcesslistQuery class InterpreterShowProcesslistQuery : public IInterpreter
{ {
public: public:
InterpreterShowProcesslistQuery(ASTPtr query_ptr_, Context & context_) InterpreterShowProcesslistQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {} : query_ptr(query_ptr_), context(context_) {}
BlockIO execute() BlockIO execute() override
{ {
return executeQuery(getRewrittenQuery(), context, true); return executeQuery(getRewrittenQuery(), context, true);
} }
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf)
{
String query = getRewrittenQuery();
ReadBufferFromString in(query);
BlockInputStreamPtr query_plan;
executeQuery(in, buf, context, query_plan, true);
return query_plan;
}
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;
Context context; Context context;

View File

@ -1,8 +1,7 @@
#pragma once #pragma once
#include <DB/DataStreams/BlockIO.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -12,13 +11,12 @@ namespace DB
/** Вывести список имён таблиц/баз данных по некоторым условиям. /** Вывести список имён таблиц/баз данных по некоторым условиям.
* Интерпретирует запрос путём замены его на запрос SELECT из таблицы system.tables или system.databases. * Интерпретирует запрос путём замены его на запрос SELECT из таблицы system.tables или system.databases.
*/ */
class InterpreterShowTablesQuery class InterpreterShowTablesQuery : public IInterpreter
{ {
public: public:
InterpreterShowTablesQuery(ASTPtr query_ptr_, Context & context_); InterpreterShowTablesQuery(ASTPtr query_ptr_, Context & context_);
BlockIO execute(); BlockIO execute() override;
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf);
private: private:
ASTPtr query_ptr; ASTPtr query_ptr;

View File

@ -2,6 +2,7 @@
#include <DB/Parsers/ASTUseQuery.h> #include <DB/Parsers/ASTUseQuery.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Interpreters/IInterpreter.h>
namespace DB namespace DB
@ -10,16 +11,17 @@ namespace DB
/** Выбрать БД по-умолчанию для сессии. /** Выбрать БД по-умолчанию для сессии.
*/ */
class InterpreterUseQuery class InterpreterUseQuery : public IInterpreter
{ {
public: public:
InterpreterUseQuery(ASTPtr query_ptr_, Context & context_) InterpreterUseQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {} : query_ptr(query_ptr_), context(context_) {}
void execute() BlockIO execute() override
{ {
const String & new_database = typeid_cast<const ASTUseQuery &>(*query_ptr).database; const String & new_database = typeid_cast<const ASTUseQuery &>(*query_ptr).database;
context.getSessionContext().setCurrentDatabase(new_database); context.getSessionContext().setCurrentDatabase(new_database);
return {};
} }
private: private:

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <DB/Core/QueryProcessingStage.h> #include <DB/Core/QueryProcessingStage.h>
#include <DB/Interpreters/InterpreterQuery.h> #include <DB/DataStreams/BlockIO.h>
namespace DB namespace DB

View File

@ -768,7 +768,7 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
StoragePtr external_storage = StorageMemory::create(external_table_name, columns); StoragePtr external_storage = StorageMemory::create(external_table_name, columns);
external_tables[external_table_name] = external_storage; external_tables[external_table_name] = external_storage;
subqueries_for_sets[external_table_name].source = interpreter->execute(); subqueries_for_sets[external_table_name].source = interpreter->execute().in;
subqueries_for_sets[external_table_name].source_sample = interpreter->getSampleBlock(); subqueries_for_sets[external_table_name].source_sample = interpreter->getSampleBlock();
subqueries_for_sets[external_table_name].table = external_storage; subqueries_for_sets[external_table_name].table = external_storage;
@ -842,7 +842,7 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block)
if (!subquery_for_set.source) if (!subquery_for_set.source)
{ {
auto interpreter = interpretSubquery(arg, context, subquery_depth); auto interpreter = interpretSubquery(arg, context, subquery_depth);
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); }); subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute().in; });
subquery_for_set.source_sample = interpreter->getSampleBlock(); subquery_for_set.source_sample = interpreter->getSampleBlock();
/** Зачем используется LazyBlockInputStream? /** Зачем используется LazyBlockInputStream?
@ -1594,7 +1594,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
if (!subquery_for_set.source) if (!subquery_for_set.source)
{ {
auto interpreter = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns); auto interpreter = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns);
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); }); subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute().in; });
subquery_for_set.source_sample = interpreter->getSampleBlock(); subquery_for_set.source_sample = interpreter->getSampleBlock();
} }

View File

@ -27,7 +27,7 @@ InterpreterAlterQuery::InterpreterAlterQuery(ASTPtr query_ptr_, Context & contex
{ {
} }
void InterpreterAlterQuery::execute() BlockIO InterpreterAlterQuery::execute()
{ {
auto & alter = typeid_cast<ASTAlterQuery &>(*query_ptr); auto & alter = typeid_cast<ASTAlterQuery &>(*query_ptr);
const String & table_name = alter.table; const String & table_name = alter.table;
@ -64,11 +64,13 @@ void InterpreterAlterQuery::execute()
} }
if (alter_commands.empty()) if (alter_commands.empty())
return; return {};
alter_commands.validate(table.get(), context); alter_commands.validate(table.get(), context);
table->alter(alter_commands, database_name, table_name, context); table->alter(alter_commands, database_name, table_name, context);
return {};
} }
void InterpreterAlterQuery::parseAlter( void InterpreterAlterQuery::parseAlter(

View File

@ -4,13 +4,14 @@
#include <DB/Columns/ColumnsNumber.h> #include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypesNumberFixed.h> #include <DB/DataTypes/DataTypesNumberFixed.h>
using namespace DB; namespace DB
{
InterpreterCheckQuery::InterpreterCheckQuery(DB::ASTPtr query_ptr_, DB::Context& context_) : query_ptr(query_ptr_), context(context_) InterpreterCheckQuery::InterpreterCheckQuery(DB::ASTPtr query_ptr_, DB::Context& context_) : query_ptr(query_ptr_), context(context_)
{ {
} }
BlockInputStreamPtr InterpreterCheckQuery::execute() BlockIO InterpreterCheckQuery::execute()
{ {
ASTCheckQuery & alter = typeid_cast<ASTCheckQuery &>(*query_ptr); ASTCheckQuery & alter = typeid_cast<ASTCheckQuery &>(*query_ptr);
String & table_name = alter.table; String & table_name = alter.table;
@ -18,16 +19,14 @@ BlockInputStreamPtr InterpreterCheckQuery::execute()
StoragePtr table = context.getTable(database_name, table_name); StoragePtr table = context.getTable(database_name, table_name);
result = getSampleBlock(); result = Block{{ new ColumnUInt8, new DataTypeUInt8, "result" }};
result.getByPosition(0).column->insert(Field(UInt64(table->checkData()))); result.getByPosition(0).column->insert(Field(UInt64(table->checkData())));
return BlockInputStreamPtr(new OneBlockInputStream(result)); BlockIO res;
res.in = new OneBlockInputStream(result);
res.in_sample = result;
return res;
} }
Block InterpreterCheckQuery::getSampleBlock()
{
DB::Block b;
ColumnPtr column(new ColumnUInt8);
b.insert(ColumnWithNameAndType(column, new DataTypeUInt8, "result"));
return b;
} }

View File

@ -42,7 +42,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & cont
} }
StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) void InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
{ {
String path = context.getPath(); String path = context.getPath();
String current_database = context.getCurrentDatabase(); String current_database = context.getCurrentDatabase();
@ -80,8 +80,6 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
if (!create.if_not_exists || !context.isDatabaseExist(database_name)) if (!create.if_not_exists || !context.isDatabaseExist(database_name))
context.addDatabase(database_name); context.addDatabase(database_name);
return StoragePtr();
} }
SharedPtr<InterpreterSelectQuery> interpreter_select; SharedPtr<InterpreterSelectQuery> interpreter_select;
@ -118,7 +116,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
if (context.isTableExist(database_name, table_name)) if (context.isTableExist(database_name, table_name))
{ {
if (create.if_not_exists) if (create.if_not_exists)
return context.getTable(database_name, table_name); return;
else else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
} }
@ -251,11 +249,9 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
/// Если запрос CREATE SELECT, то вставим в таблицу данные /// Если запрос CREATE SELECT, то вставим в таблицу данные
if (create.select && storage_name != "View" && (storage_name != "MaterializedView" || create.is_populate)) if (create.select && storage_name != "View" && (storage_name != "MaterializedView" || create.is_populate))
{ {
BlockInputStreamPtr from = new MaterializingBlockInputStream(interpreter_select->execute()); BlockInputStreamPtr from = new MaterializingBlockInputStream(interpreter_select->execute().in);
copyData(*from, *res->write(query_ptr)); copyData(*from, *res->write(query_ptr));
} }
return res;
} }
InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(ASTPtr expression_list) InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(ASTPtr expression_list)

View File

@ -16,7 +16,7 @@ InterpreterDropQuery::InterpreterDropQuery(ASTPtr query_ptr_, Context & context_
} }
void InterpreterDropQuery::execute() BlockIO InterpreterDropQuery::execute()
{ {
String path = context.getPath(); String path = context.getPath();
String current_database = context.getCurrentDatabase(); String current_database = context.getCurrentDatabase();
@ -43,7 +43,7 @@ void InterpreterDropQuery::execute()
if (table) if (table)
tables_to_drop.push_back(table); tables_to_drop.push_back(table);
else else
return; return {};
} }
else else
{ {
@ -52,7 +52,7 @@ void InterpreterDropQuery::execute()
if (!drop.if_exists) if (!drop.if_exists)
context.assertDatabaseExists(database_name); context.assertDatabaseExists(database_name);
else if (!context.isDatabaseExist(database_name)) else if (!context.isDatabaseExist(database_name))
return; return {};
Tables tables = context.getDatabases()[database_name]; Tables tables = context.getDatabases()[database_name];
@ -111,6 +111,8 @@ void InterpreterDropQuery::execute()
Poco::File(data_path).remove(false); Poco::File(data_path).remove(false);
Poco::File(metadata_path).remove(false); Poco::File(metadata_path).remove(false);
} }
return {};
} }
void InterpreterDropQuery::dropDetachedTable(String database_name, StoragePtr table, Context & context) void InterpreterDropQuery::dropDetachedTable(String database_name, StoragePtr table, Context & context)

View File

@ -0,0 +1,117 @@
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTDropQuery.h>
#include <DB/Parsers/ASTRenameQuery.h>
#include <DB/Parsers/ASTShowTablesQuery.h>
#include <DB/Parsers/ASTUseQuery.h>
#include <DB/Parsers/ASTSetQuery.h>
#include <DB/Parsers/ASTOptimizeQuery.h>
#include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTCheckQuery.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/InterpreterDropQuery.h>
#include <DB/Interpreters/InterpreterRenameQuery.h>
#include <DB/Interpreters/InterpreterShowTablesQuery.h>
#include <DB/Interpreters/InterpreterUseQuery.h>
#include <DB/Interpreters/InterpreterSetQuery.h>
#include <DB/Interpreters/InterpreterOptimizeQuery.h>
#include <DB/Interpreters/InterpreterExistsQuery.h>
#include <DB/Interpreters/InterpreterDescribeQuery.h>
#include <DB/Interpreters/InterpreterShowCreateQuery.h>
#include <DB/Interpreters/InterpreterQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/InterpreterShowProcesslistQuery.h>
#include <DB/Interpreters/InterpreterCheckQuery.h>
#include <DB/Interpreters/InterpreterFactory.h>
namespace DB
{
static void throwIfReadOnly(Context & context)
{
if (context.getSettingsRef().limits.readonly)
throw Exception("Cannot execute query in readonly mode", ErrorCodes::READONLY);
}
SharedPtr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage)
{
if (typeid_cast<ASTSelectQuery *>(query.get()))
{
return new InterpreterSelectQuery(query, context, stage);
}
else if (typeid_cast<ASTInsertQuery *>(query.get()))
{
throwIfReadOnly(context);
return new InterpreterInsertQuery(query, context);
}
else if (typeid_cast<ASTCreateQuery *>(query.get()))
{
throwIfReadOnly(context);
return new InterpreterCreateQuery(query, context);
}
else if (typeid_cast<ASTDropQuery *>(query.get()))
{
throwIfReadOnly(context);
return new InterpreterDropQuery(query, context);
}
else if (typeid_cast<ASTRenameQuery *>(query.get()))
{
throwIfReadOnly(context);
return new InterpreterRenameQuery(query, context);
}
else if (typeid_cast<ASTShowTablesQuery *>(query.get()))
{
return new InterpreterShowTablesQuery(query, context);
}
else if (typeid_cast<ASTUseQuery *>(query.get()))
{
return new InterpreterUseQuery(query, context);
}
else if (typeid_cast<ASTSetQuery *>(query.get()))
{
/// readonly проверяется внутри InterpreterSetQuery
return new InterpreterSetQuery(query, context);
}
else if (typeid_cast<ASTOptimizeQuery *>(query.get()))
{
throwIfReadOnly(context);
return new InterpreterOptimizeQuery(query, context);
}
else if (typeid_cast<ASTExistsQuery *>(query.get()))
{
return new InterpreterExistsQuery(query, context);
}
else if (typeid_cast<ASTShowCreateQuery *>(query.get()))
{
return new InterpreterShowCreateQuery(query, context);
}
else if (typeid_cast<ASTDescribeQuery *>(query.get()))
{
return new InterpreterDescribeQuery(query, context);
}
else if (typeid_cast<ASTShowProcesslistQuery *>(query.get()))
{
return new InterpreterShowProcesslistQuery(query, context);
}
else if (typeid_cast<ASTAlterQuery *>(query.get()))
{
throwIfReadOnly(context);
return new InterpreterAlterQuery(query, context);
}
else if (typeid_cast<ASTCheckQuery *>(query.get()))
{
return new InterpreterCheckQuery(query, context);
}
else
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
}

View File

@ -64,69 +64,6 @@ Block InterpreterInsertQuery::getSampleBlock()
return res; return res;
} }
void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
{
ASTInsertQuery & query = typeid_cast<ASTInsertQuery &>(*query_ptr);
StoragePtr table = getTable();
auto table_lock = table->lockStructure(true);
/** @note looks suspicious, first we ask to create block from NamesAndTypesList (internally in ITableDeclaration),
* then we compose the same list from the resulting block */
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList());
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
BlockOutputStreamPtr out{
new ProhibitColumnsBlockOutputStream{
new AddingDefaultBlockOutputStream{
new MaterializingBlockOutputStream{
new PushingToViewsBlockOutputStream{query.database, query.table, context, query_ptr}
},
required_columns, table->column_defaults, context, context.getSettingsRef().strict_insert_defaults
},
table->materialized_columns
}
};
/// Какой тип запроса: INSERT VALUES | INSERT FORMAT | INSERT SELECT?
if (!query.select)
{
String format = query.format;
if (format.empty())
format = "Values";
/// Данные могут содержаться в распарсенной (query.data) и ещё не распарсенной (remaining_data_istr) части запроса.
ConcatReadBuffer::ReadBuffers buffers;
ReadBuffer buf1(const_cast<char *>(query.data), query.data ? query.end - query.data : 0, 0);
if (query.data)
buffers.push_back(&buf1);
buffers.push_back(remaining_data_istr);
/** NOTE Нельзя читать из remaining_data_istr до того, как прочтём всё между query.data и query.end.
* - потому что query.data может ссылаться на кусок памяти, использующийся в качестве буфера в remaining_data_istr.
*/
ConcatReadBuffer istr(buffers);
Block sample = getSampleBlock();
BlockInputStreamPtr in{
context.getFormatFactory().getInput(
format, istr, sample, context.getSettings().max_insert_block_size)};
copyData(*in, *out);
}
else
{
InterpreterSelectQuery interpreter_select(query.select, context);
BlockInputStreamPtr in{interpreter_select.execute()};
copyData(*in, *out);
}
}
BlockIO InterpreterInsertQuery::execute() BlockIO InterpreterInsertQuery::execute()
{ {
@ -161,7 +98,7 @@ BlockIO InterpreterInsertQuery::execute()
else else
{ {
InterpreterSelectQuery interpreter_select{query.select, context}; InterpreterSelectQuery interpreter_select{query.select, context};
BlockInputStreamPtr in{interpreter_select.execute()}; BlockInputStreamPtr in{interpreter_select.execute().in};
res.in = new NullAndDoCopyBlockInputStream{in, out}; res.in = new NullAndDoCopyBlockInputStream{in, out};
} }

View File

@ -1,227 +0,0 @@
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTDropQuery.h>
#include <DB/Parsers/ASTRenameQuery.h>
#include <DB/Parsers/ASTShowTablesQuery.h>
#include <DB/Parsers/ASTUseQuery.h>
#include <DB/Parsers/ASTSetQuery.h>
#include <DB/Parsers/ASTOptimizeQuery.h>
#include <DB/Parsers/ASTAlterQuery.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h>
#include <DB/Parsers/TablePropertiesQueriesASTs.h>
#include <DB/Parsers/ASTCheckQuery.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/InterpreterDropQuery.h>
#include <DB/Interpreters/InterpreterRenameQuery.h>
#include <DB/Interpreters/InterpreterShowTablesQuery.h>
#include <DB/Interpreters/InterpreterUseQuery.h>
#include <DB/Interpreters/InterpreterSetQuery.h>
#include <DB/Interpreters/InterpreterOptimizeQuery.h>
#include <DB/Interpreters/InterpreterExistsQuery.h>
#include <DB/Interpreters/InterpreterDescribeQuery.h>
#include <DB/Interpreters/InterpreterShowCreateQuery.h>
#include <DB/Interpreters/InterpreterQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Interpreters/InterpreterShowProcesslistQuery.h>
#include <DB/Interpreters/InterpreterCheckQuery.h>
namespace DB
{
InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, QueryProcessingStage::Enum stage_)
: query_ptr(query_ptr_), context(context_), stage(stage_)
{
}
void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_istr, BlockInputStreamPtr & query_plan)
{
if (typeid_cast<ASTSelectQuery *>(&*query_ptr))
{
InterpreterSelectQuery interpreter(query_ptr, context, stage);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (typeid_cast<ASTInsertQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterInsertQuery interpreter(query_ptr, context);
interpreter.execute(remaining_data_istr);
}
else if (typeid_cast<ASTCreateQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterCreateQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTDropQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterDropQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTRenameQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterRenameQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTShowTablesQuery *>(&*query_ptr))
{
InterpreterShowTablesQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (typeid_cast<ASTUseQuery *>(&*query_ptr))
{
InterpreterUseQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTSetQuery *>(&*query_ptr))
{
/// readonly проверяется внутри InterpreterSetQuery
InterpreterSetQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTOptimizeQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterOptimizeQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTExistsQuery *>(&*query_ptr))
{
InterpreterExistsQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (typeid_cast<ASTShowCreateQuery *>(&*query_ptr))
{
InterpreterShowCreateQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (typeid_cast<ASTDescribeQuery *>(&*query_ptr))
{
InterpreterDescribeQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (typeid_cast<ASTShowProcesslistQuery *>(&*query_ptr))
{
InterpreterShowProcesslistQuery interpreter(query_ptr, context);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (typeid_cast<ASTAlterQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterAlterQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTCheckQuery *>(&*query_ptr))
{
InterpreterCheckQuery interpreter(query_ptr, context);
query_plan = interpreter.execute();
}
else
throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}
BlockIO InterpreterQuery::execute()
{
BlockIO res;
if (typeid_cast<ASTSelectQuery *>(&*query_ptr))
{
InterpreterSelectQuery interpreter(query_ptr, context, stage);
res.in = interpreter.execute();
res.in_sample = interpreter.getSampleBlock();
}
else if (typeid_cast<ASTInsertQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterInsertQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (typeid_cast<ASTCreateQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterCreateQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTDropQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterDropQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTRenameQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterRenameQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTShowTablesQuery *>(&*query_ptr))
{
InterpreterShowTablesQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (typeid_cast<ASTUseQuery *>(&*query_ptr))
{
InterpreterUseQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTSetQuery *>(&*query_ptr))
{
/// readonly проверяется внутри InterpreterSetQuery
InterpreterSetQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTOptimizeQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterOptimizeQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTExistsQuery *>(&*query_ptr))
{
InterpreterExistsQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (typeid_cast<ASTShowCreateQuery *>(&*query_ptr))
{
InterpreterShowCreateQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (typeid_cast<ASTDescribeQuery *>(&*query_ptr))
{
InterpreterDescribeQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (typeid_cast<ASTShowProcesslistQuery *>(&*query_ptr))
{
InterpreterShowProcesslistQuery interpreter(query_ptr, context);
res = interpreter.execute();
}
else if (typeid_cast<ASTAlterQuery *>(&*query_ptr))
{
throwIfReadOnly();
InterpreterAlterQuery interpreter(query_ptr, context);
interpreter.execute();
}
else if (typeid_cast<ASTCheckQuery *>(&*query_ptr))
{
InterpreterCheckQuery interpreter(query_ptr, context);
res.in = interpreter.execute();
res.in_sample = interpreter.getSampleBlock();
}
else
throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
return res;
}
}

View File

@ -58,7 +58,7 @@ struct RenameDescription
}; };
void InterpreterRenameQuery::execute() BlockIO InterpreterRenameQuery::execute()
{ {
String path = context.getPath(); String path = context.getPath();
String current_database = context.getCurrentDatabase(); String current_database = context.getCurrentDatabase();
@ -151,6 +151,8 @@ void InterpreterRenameQuery::execute()
/// Удаляем старый файл с метаданными. /// Удаляем старый файл с метаданными.
Poco::File(elem.from_metadata_path).remove(); Poco::File(elem.from_metadata_path).remove();
} }
return {};
} }

View File

@ -297,12 +297,17 @@ Block InterpreterSelectQuery::getSampleBlock()
} }
BlockInputStreamPtr InterpreterSelectQuery::execute() BlockIO InterpreterSelectQuery::execute()
{ {
(void) executeWithoutUnion(); (void) executeWithoutUnion();
if (streams.empty()) if (streams.empty())
return new NullBlockInputStream; {
BlockIO res;
res.in = new NullBlockInputStream;
res.in_sample = getSampleBlock();
return res;
}
executeUnion(streams); executeUnion(streams);
@ -326,7 +331,11 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
} }
} }
return streams[0]; BlockIO res;
res.in = streams[0];
res.in_sample = getSampleBlock();
return res;
} }
const BlockInputStreams & InterpreterSelectQuery::executeWithoutUnion() const BlockInputStreams & InterpreterSelectQuery::executeWithoutUnion()
@ -1011,20 +1020,6 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(BlockInputStreams &
} }
BlockInputStreamPtr InterpreterSelectQuery::executeAndFormat(WriteBuffer & buf)
{
Block sample = getSampleBlock();
String format_name = query.format ? typeid_cast<ASTIdentifier &>(*query.format).name : context.getDefaultFormat();
BlockInputStreamPtr in = execute();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, buf, sample);
copyData(*in, *out);
return in;
}
void InterpreterSelectQuery::ignoreWithTotals() void InterpreterSelectQuery::ignoreWithTotals()
{ {
query.group_by_with_totals = false; query.group_by_with_totals = false;

View File

@ -52,14 +52,4 @@ BlockIO InterpreterShowTablesQuery::execute()
} }
BlockInputStreamPtr InterpreterShowTablesQuery::executeAndFormat(WriteBuffer & buf)
{
String query = getRewrittenQuery();
ReadBufferFromString in(query);
BlockInputStreamPtr query_plan;
executeQuery(in, buf, context, query_plan, true);
return query_plan;
}
} }

View File

@ -1,13 +1,19 @@
#include <DB/Common/ProfileEvents.h> #include <DB/Common/ProfileEvents.h>
#include <DB/Parsers/formatAST.h> #include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/BlockIO.h> #include <DB/DataStreams/BlockIO.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTInsertQuery.h> #include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Parsers/ASTShowProcesslistQuery.h> #include <DB/Parsers/ASTShowProcesslistQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ParserQuery.h> #include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/parseQuery.h> #include <DB/Parsers/parseQuery.h>
#include <DB/Interpreters/Quota.h> #include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/InterpreterFactory.h>
#include <DB/Interpreters/executeQuery.h> #include <DB/Interpreters/executeQuery.h>
@ -32,11 +38,12 @@ static void logQuery(const String & query, const Context & context)
} }
/** Распарсить запрос. Записать его в лог вместе с IP адресом клиента. static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
* Проверить ограничения. Записать запрос в ProcessList. IParser::Pos begin,
*/ IParser::Pos end,
static std::tuple<ASTPtr, ProcessList::EntryPtr> prepareQuery( Context & context,
IParser::Pos begin, IParser::Pos end, Context & context, bool internal) bool internal,
QueryProcessingStage::Enum stage)
{ {
ProfileEvents::increment(ProfileEvents::Query); ProfileEvents::increment(ProfileEvents::Query);
@ -72,6 +79,11 @@ static std::tuple<ASTPtr, ProcessList::EntryPtr> prepareQuery(
/// Проверка ограничений. /// Проверка ограничений.
checkLimits(*ast, context.getSettingsRef().limits); checkLimits(*ast, context.getSettingsRef().limits);
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
quota.checkExceeded(current_time);
/// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем. /// Положим запрос в список процессов. Но запрос SHOW PROCESSLIST класть не будем.
ProcessList::EntryPtr process_list_entry; ProcessList::EntryPtr process_list_entry;
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast)) if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
@ -85,7 +97,37 @@ static std::tuple<ASTPtr, ProcessList::EntryPtr> prepareQuery(
context.setProcessListElement(&process_list_entry->get()); context.setProcessListElement(&process_list_entry->get());
} }
return std::make_tuple(ast, process_list_entry); BlockIO res;
try
{
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
return std::make_tuple(ast, res);
}
BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal, stage);
return streams;
} }
@ -124,65 +166,55 @@ void executeQuery(
} }
ASTPtr ast; ASTPtr ast;
ProcessList::EntryPtr process_list_entry; BlockIO streams;
std::tie(ast, process_list_entry) = prepareQuery(begin, end, context, internal); std::tie(ast, streams) = executeQueryImpl(begin, end, context, internal, stage);
QuotaForIntervals & quota = context.getQuota(); if (streams.out)
time_t current_time = time(0);
quota.checkExceeded(current_time);
try
{ {
InterpreterQuery interpreter(ast, context, stage); const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());
interpreter.execute(ostr, &istr, query_plan);
} if (!ast_insert_query)
catch (...) throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
{
quota.addError(current_time); String format = ast_insert_query->format;
throw; if (format.empty())
format = "Values";
/// Данные могут содержаться в распарсенной (query.data) и ещё не распарсенной (remaining_data_istr) части запроса.
ConcatReadBuffer::ReadBuffers buffers;
ReadBuffer buf1(const_cast<char *>(ast_insert_query->data), ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0, 0);
if (ast_insert_query->data)
buffers.push_back(&buf1);
buffers.push_back(&istr);
/** NOTE Нельзя читать из istr до того, как прочтём всё между query.data и query.end.
* - потому что query.data может ссылаться на кусок памяти, использующийся в качестве буфера в istr.
*/
ConcatReadBuffer data_istr(buffers);
BlockInputStreamPtr in{
context.getFormatFactory().getInput(
format, data_istr, streams.out_sample, context.getSettings().max_insert_block_size)};
copyData(*in, *streams.out);
} }
quota.addQuery(current_time); if (streams.in)
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
String format_name = ast_query_with_output && ast_query_with_output->format
? typeid_cast<const ASTIdentifier &>(*ast_query_with_output->format).name
: context.getDefaultFormat();
BlockOutputStreamPtr out = context.getFormatFactory().getOutput(format_name, ostr, streams.in_sample);
copyData(*streams.in, *out);
}
} }
BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage)
{
ASTPtr ast;
ProcessList::EntryPtr process_list_entry;
std::tie(ast, process_list_entry) = prepareQuery(query.data(), query.data() + query.size(), context, internal);
QuotaForIntervals & quota = context.getQuota();
time_t current_time = time(0);
quota.checkExceeded(current_time);
BlockIO res;
try
{
InterpreterQuery interpreter(ast, context, stage);
res = interpreter.execute();
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
}
catch (...)
{
quota.addError(current_time);
throw;
}
quota.addQuery(current_time);
return res;
}
} }

View File

@ -30,8 +30,7 @@ static void executeCreateQuery(const String & query, Context & context, const St
ast_create_query.attach = true; ast_create_query.attach = true;
ast_create_query.database = database; ast_create_query.database = database;
InterpreterCreateQuery interpreter(ast, context); InterpreterCreateQuery(ast, context).executeLoadExisting();
interpreter.execute(true);
} }

View File

@ -130,7 +130,7 @@ BlockInputStreams StorageBuffer::read(
*/ */
if (processed_stage > QueryProcessingStage::FetchColumns) if (processed_stage > QueryProcessingStage::FetchColumns)
for (auto & stream : streams_from_buffers) for (auto & stream : streams_from_buffers)
stream = InterpreterSelectQuery(query, context, processed_stage, 0, stream).execute(); stream = InterpreterSelectQuery(query, context, processed_stage, 0, stream).execute().in;
streams_from_dst.insert(streams_from_dst.end(), streams_from_buffers.begin(), streams_from_buffers.end()); streams_from_dst.insert(streams_from_dst.end(), streams_from_buffers.begin(), streams_from_buffers.end());
return streams_from_dst; return streams_from_dst;

View File

@ -190,7 +190,7 @@ BlockInputStreams StorageDistributed::read(
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов, * Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые. * а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/ */
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute())); res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in));
} }
} }

View File

@ -43,22 +43,23 @@ StorageMaterializedView::StorageMaterializedView(
{ {
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_); ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
auto inner_table_name = getInnerTableName();
/// Если запрос ATTACH, то к этому моменту внутренняя таблица уже должна быть подключена. /// Если запрос ATTACH, то к этому моменту внутренняя таблица уже должна быть подключена.
if (attach_) if (attach_)
{ {
if (!context.isTableExist(database_name, getInnerTableName())) if (!data)
throw Exception("Inner table is not attached yet." throw Exception("Inner table is not attached yet."
" Materialized view: " + database_name + "." + table_name + "." " Materialized view: " + database_name + "." + table_name + "."
" Inner table: " + database_name + "." + getInnerTableName() + ".", " Inner table: " + database_name + "." + inner_table_name + ".",
DB::ErrorCodes::LOGICAL_ERROR); DB::ErrorCodes::LOGICAL_ERROR);
data = context.getTable(database_name, getInnerTableName());
} }
else else
{ {
/// Составим запрос для создания внутреннего хранилища. /// Составим запрос для создания внутреннего хранилища.
ASTCreateQuery * manual_create_query = new ASTCreateQuery(); ASTCreateQuery * manual_create_query = new ASTCreateQuery();
manual_create_query->database = database_name; manual_create_query->database = database_name;
manual_create_query->table = getInnerTableName(); manual_create_query->table = inner_table_name;
manual_create_query->columns = create.columns; manual_create_query->columns = create.columns;
manual_create_query->children.push_back(manual_create_query->columns); manual_create_query->children.push_back(manual_create_query->columns);
ASTPtr ast_create_query = manual_create_query; ASTPtr ast_create_query = manual_create_query;
@ -78,7 +79,9 @@ StorageMaterializedView::StorageMaterializedView(
/// Выполним запрос. /// Выполним запрос.
InterpreterCreateQuery create_interpreter(ast_create_query, context); InterpreterCreateQuery create_interpreter(ast_create_query, context);
data = create_interpreter.execute(); create_interpreter.execute();
data = context.getTable(database_name, inner_table_name);
} }
} }
@ -115,14 +118,18 @@ BlockOutputStreamPtr StorageMaterializedView::write(ASTPtr query)
void StorageMaterializedView::drop() void StorageMaterializedView::drop()
{ {
context.getGlobalContext().removeDependency(DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); context.getGlobalContext().removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
if (context.tryGetTable(database_name, getInnerTableName())) auto inner_table_name = getInnerTableName();
if (context.tryGetTable(database_name, inner_table_name))
{ {
/// Состваляем и выполняем запрос drop для внутреннего хранилища. /// Состваляем и выполняем запрос drop для внутреннего хранилища.
ASTDropQuery *drop_query = new ASTDropQuery; ASTDropQuery *drop_query = new ASTDropQuery;
drop_query->database = database_name; drop_query->database = database_name;
drop_query->table = getInnerTableName(); drop_query->table = inner_table_name;
ASTPtr ast_drop_query = drop_query; ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, context); InterpreterDropQuery drop_interpreter(ast_drop_query, context);
drop_interpreter.execute(); drop_interpreter.execute();