This commit is contained in:
Evgeniy Gatov 2014-12-18 21:07:00 +03:00
commit 05543d9f9a
192 changed files with 9291 additions and 1454 deletions

View File

@ -87,7 +87,7 @@ struct SingleValueDataFixed
void changeIfLess(const Self & to) void changeIfLess(const Self & to)
{ {
if (!has() || to.value < value) if (to.has() && (!has() || to.value < value))
change(to); change(to);
} }
@ -99,7 +99,7 @@ struct SingleValueDataFixed
void changeIfGreater(const Self & to) void changeIfGreater(const Self & to)
{ {
if (!has() || to.value > value) if (to.has() && (!has() || to.value > value))
change(to); change(to);
} }
}; };
@ -258,7 +258,7 @@ struct __attribute__((__packed__)) SingleValueDataString
void changeIfLess(const Self & to) void changeIfLess(const Self & to)
{ {
if (!has() || to.getStringRef() < getStringRef()) if (to.has() && (!has() || to.getStringRef() < getStringRef()))
change(to); change(to);
} }
@ -270,7 +270,7 @@ struct __attribute__((__packed__)) SingleValueDataString
void changeIfGreater(const Self & to) void changeIfGreater(const Self & to)
{ {
if (!has() || to.getStringRef() > getStringRef()) if (to.has() && (!has() || to.getStringRef() > getStringRef()))
change(to); change(to);
} }
}; };
@ -353,7 +353,7 @@ struct SingleValueDataGeneric
void changeIfLess(const Self & to) void changeIfLess(const Self & to)
{ {
if (!has() || to.value < value) if (to.has() && (!has() || to.value < value))
change(to); change(to);
} }
@ -372,7 +372,7 @@ struct SingleValueDataGeneric
void changeIfGreater(const Self & to) void changeIfGreater(const Self & to)
{ {
if (!has() || to.value > value) if (to.has() && (!has() || to.value > value))
change(to); change(to);
} }
}; };

View File

@ -278,7 +278,16 @@ public:
Chars_t & res_chars = res_->chars; Chars_t & res_chars = res_->chars;
Offsets_t & res_offsets = res_->offsets; Offsets_t & res_offsets = res_->offsets;
res_chars.resize(chars.size()); if (limit == size)
res_chars.resize(chars.size());
else
{
size_t new_chars_size = 0;
for (size_t i = 0; i < limit; ++i)
new_chars_size += sizeAt(perm[i]);
res_chars.resize(new_chars_size);
}
res_offsets.resize(limit); res_offsets.resize(limit);
Offset_t current_new_offset = 0; Offset_t current_new_offset = 0;
@ -295,8 +304,6 @@ public:
res_offsets[i] = current_new_offset; res_offsets[i] = current_new_offset;
} }
res_chars.resize(res_offsets[limit - 1]);
return res; return res;
} }

View File

@ -202,12 +202,12 @@ public:
{ {
if (start + length > data.size()) if (start + length > data.size())
throw Exception("Parameters start = " throw Exception("Parameters start = "
+ toString(start) + ", length = " + toString(start) + ", length = "
+ toString(length) + " are out of bound in IColumnVector<T>::cut() method" + toString(length) + " are out of bound in IColumnVector<T>::cut() method"
" (data.size() = " + toString(data.size()) + ").", " (data.size() = " + toString(data.size()) + ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND); ErrorCodes::PARAMETER_OUT_OF_BOUND);
Self * res = new Self(length); Self * res = new Self(length);
memcpy(&res->getData()[0], &data[start], length * sizeof(data[0])); memcpy(&res->getData()[0], &data[start], length * sizeof(data[0]));
return res; return res;
} }

View File

@ -142,7 +142,7 @@ private:
if (nullptr == c_start) if (nullptr == c_start)
throwFromErrno("PODArray: cannot realloc", ErrorCodes::CANNOT_ALLOCATE_MEMORY); throwFromErrno("PODArray: cannot realloc", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
memcpy(c_start, old_c_start, old_c_end_of_storage - old_c_start); memcpy(c_start, old_c_start, std::min(bytes_to_alloc, static_cast<size_t>(end_diff)));
Allocator::deallocate(old_c_start, old_c_end_of_storage - old_c_start); Allocator::deallocate(old_c_start, old_c_end_of_storage - old_c_start);
} }

View File

@ -265,6 +265,7 @@ namespace ErrorCodes
TOO_MUCH_RETRIES_TO_FETCH_PARTS, TOO_MUCH_RETRIES_TO_FETCH_PARTS,
PARTITION_ALREADY_EXISTS, PARTITION_ALREADY_EXISTS,
PARTITION_DOESNT_EXIST, PARTITION_DOESNT_EXIST,
UNION_ALL_RESULT_STRUCTURES_MISMATCH,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,
STD_EXCEPTION, STD_EXCEPTION,

View File

@ -17,6 +17,8 @@
#include <DB/IO/WriteHelpers.h> #include <DB/IO/WriteHelpers.h>
#include <DB/IO/WriteBufferFromString.h> #include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/DoubleConverter.h>
namespace DB namespace DB
{ {
@ -590,24 +592,17 @@ private:
* *
* NOTE: При таком roundtrip-е, точность может теряться. * NOTE: При таком roundtrip-е, точность может теряться.
*/ */
static inline String formatFloat(Float64 x) static String formatFloat(const Float64 x)
{ {
char tmp[24]; char tmp[25];
int res = std::snprintf(tmp, 23, "%.*g", WRITE_HELPERS_DEFAULT_FLOAT_PRECISION, x); double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
if (res >= 23 || res <= 0) const auto result = getDoubleToStringConverter().ToShortest(x, &builder);
if (!result)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
size_t string_size = res; return { tmp, tmp + builder.position() };
tmp[23] = '\0';
if (string_size == strspn(tmp, "-0123456789"))
{
tmp[string_size] = '.';
++string_size;
}
return {tmp, string_size};
} }
public: public:

View File

@ -1,9 +1,8 @@
#pragma once #pragma once
#include <statdaemons/threadpool.hpp>
#include <DB/Interpreters/Aggregator.h> #include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h> #include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ParallelInputsProcessor.h>
namespace DB namespace DB
@ -13,30 +12,34 @@ using Poco::SharedPtr;
/** Агрегирует несколько источников параллельно. /** Агрегирует несколько источников параллельно.
* Запускает агрегацию отдельных источников в отдельных потоках, затем объединяет результаты. * Производит агрегацию блоков из разных источников независимо в разных потоках, затем объединяет результаты.
* Если final == false, агрегатные функции не финализируются, то есть, не заменяются на своё значение, а содержат промежуточное состояние вычислений. * Если final == false, агрегатные функции не финализируются, то есть, не заменяются на своё значение, а содержат промежуточное состояние вычислений.
* Это необходимо, чтобы можно было продолжить агрегацию (например, объединяя потоки частично агрегированных данных). * Это необходимо, чтобы можно было продолжить агрегацию (например, объединяя потоки частично агрегированных данных).
*/ */
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, const ColumnNumbers & keys_, ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const ColumnNumbers & keys_,
AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, unsigned max_threads_ = 1, AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW) size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: aggregator(new Aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_)), : aggregator(new Aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_)),
has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size())) has_been_read(false), final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(keys_.size()), aggregates_size(aggregates_.size()),
handler(*this), processor(inputs, max_threads, handler)
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs.begin(), inputs.end());
} }
/** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены. /** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены.
*/ */
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, const Names & key_names, ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const Names & key_names,
const AggregateDescriptions & aggregates, bool overflow_row_, bool final_, unsigned max_threads_ = 1, const AggregateDescriptions & aggregates, bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW) size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
: has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size())) : has_been_read(false), final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(key_names.size()), aggregates_size(aggregates.size()),
handler(*this), processor(inputs, max_threads, handler)
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs.begin(), inputs.end());
aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_); aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_);
} }
@ -62,6 +65,14 @@ public:
return res.str(); return res.str();
} }
void cancel() override
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
processor.cancel();
}
protected: protected:
Block readImpl() override Block readImpl() override
{ {
@ -70,23 +81,54 @@ protected:
has_been_read = true; has_been_read = true;
ManyAggregatedDataVariants many_data(children.size()); many_data.resize(max_threads);
Exceptions exceptions(children.size()); exceptions.resize(max_threads);
for (size_t i = 0, size = many_data.size(); i < size; ++i) for (size_t i = 0; i < max_threads; ++i)
{ threads_data.emplace_back(keys_size, aggregates_size);
many_data[i] = new AggregatedDataVariants;
pool.schedule(std::bind(&ParallelAggregatingBlockInputStream::calculate, this, LOG_TRACE(log, "Aggregating");
std::ref(children[i]), std::ref(*many_data[i]), std::ref(exceptions[i]), current_memory_tracker));
} Stopwatch watch;
pool.wait();
for (auto & elem : many_data)
elem = new AggregatedDataVariants;
processor.process();
processor.wait();
rethrowFirstException(exceptions); rethrowFirstException(exceptions);
if (isCancelled()) if (isCancelled())
return Block(); return Block();
double elapsed_seconds = watch.elapsedSeconds();
size_t total_src_rows = 0;
size_t total_src_bytes = 0;
for (size_t i = 0; i < max_threads; ++i)
{
size_t rows = many_data[i]->size();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Aggregated. " << threads_data[i].src_rows << " to " << rows << " rows"
<< " (from " << threads_data[i].src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << threads_data[i].src_rows / elapsed_seconds << " rows/sec., "
<< threads_data[i].src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
total_src_rows += threads_data[i].src_rows;
total_src_bytes += threads_data[i].src_bytes;
}
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
AggregatedDataVariantsPtr res = aggregator->merge(many_data); AggregatedDataVariantsPtr res = aggregator->merge(many_data);
if (isCancelled())
return Block();
return aggregator->convertToBlock(*res, final); return aggregator->convertToBlock(*res, final);
} }
@ -95,22 +137,74 @@ private:
bool has_been_read; bool has_been_read;
bool final; bool final;
size_t max_threads; size_t max_threads;
boost::threadpool::pool pool;
/// Вычисления, которые выполняются в отдельном потоке size_t keys_size;
void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception, MemoryTracker * memory_tracker) size_t aggregates_size;
/** Используется, если есть ограничение на максимальное количество строк при агрегации,
* и если group_by_overflow_mode == ANY.
* В этом случае, новые ключи не добавляются в набор, а производится агрегация только по
* ключам, которые уже успели попасть в набор.
*/
bool no_more_keys = false;
Logger * log = &Logger::get("ParallelAggregatingBlockInputStream");
struct Handler
{ {
current_memory_tracker = memory_tracker; Handler(ParallelAggregatingBlockInputStream & parent_)
: parent(parent_) {}
try void onBlock(Block & block, size_t thread_num)
{ {
aggregator->execute(input, data); parent.aggregator->executeOnBlock(block, *parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].key_sizes, parent.threads_data[thread_num].key, parent.no_more_keys);
parent.threads_data[thread_num].src_rows += block.rowsInFirstColumn();
parent.threads_data[thread_num].src_bytes += block.bytes();
} }
catch (...)
void onFinish()
{ {
exception = cloneCurrentException();
} }
}
void onException(ExceptionPtr & exception, size_t thread_num)
{
parent.exceptions[thread_num] = exception;
parent.cancel();
}
ParallelAggregatingBlockInputStream & parent;
};
Handler handler;
ParallelInputsProcessor<Handler> processor;
ManyAggregatedDataVariants many_data;
Exceptions exceptions;
struct ThreadData
{
size_t src_rows = 0;
size_t src_bytes = 0;
StringRefs key;
ConstColumnPlainPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Sizes key_sizes;
ThreadData(size_t keys_size, size_t aggregates_size)
{
key.resize(keys_size);
key_columns.resize(keys_size);
aggregate_columns.resize(aggregates_size);
key_sizes.resize(keys_size);
}
};
std::vector<ThreadData> threads_data;
}; };
} }

View File

@ -0,0 +1,235 @@
#pragma once
#include <list>
#include <queue>
#include <atomic>
#include <thread>
#include <mutex>
#include <Yandex/logger_useful.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
/** Позволяет обработать множество источников блоков параллельно, используя указанное количество потоков.
* Вынимает из любого доступного источника блок и передаёт его на обработку в предоставленный handler.
*
* Устроено так:
* - есть набор источников, из которых можно вынимать блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
*/
namespace DB
{
/// Пример обработчика.
struct ParallelInputsHandler
{
/// Обработка блока данных.
void onBlock(Block & block, size_t thread_num) {}
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
void onFinish() {}
/// Обработка исключения. Разумно вызывать в этом методе метод ParallelInputsProcessor::cancel, а также передавать эксепшен в основной поток.
void onException(ExceptionPtr & exception, size_t thread_num) {}
};
template <typename Handler>
class ParallelInputsProcessor
{
public:
ParallelInputsProcessor(BlockInputStreams inputs_, size_t max_threads_, Handler & handler_)
: inputs(inputs_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
{
for (size_t i = 0; i < inputs_.size(); ++i)
input_queue.emplace(inputs_[i], i);
}
~ParallelInputsProcessor()
{
try
{
wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// Запустить фоновые потоки, начать работу.
void process()
{
active_threads = max_threads;
threads.reserve(max_threads);
for (size_t i = 0; i < max_threads; ++i)
threads.emplace_back(std::bind(&ParallelInputsProcessor::thread, this, current_memory_tracker, i));
}
/// Попросить все источники остановиться раньше, чем они иссякнут.
void cancel()
{
finish = true;
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
{
try
{
child->cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
}
}
}
/// Подождать завершения работы всех потоков раньше деструктора.
void wait()
{
if (joined_threads)
return;
for (auto & thread : threads)
thread.join();
threads.clear();
joined_threads = true;
}
size_t getNumActiveThreads() const
{
return active_threads;
}
private:
/// Данные отдельного источника
struct InputData
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
};
void thread(MemoryTracker * memory_tracker, size_t thread_num)
{
current_memory_tracker = memory_tracker;
ExceptionPtr exception;
try
{
loop(thread_num);
}
catch (...)
{
exception = cloneCurrentException();
}
if (exception)
{
handler.onException(exception, thread_num);
}
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{
handler.onFinish();
}
}
void loop(size_t thread_num)
{
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
InputData input;
/// Выбираем следующий источник.
{
std::lock_guard<std::mutex> lock(input_queue_mutex);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (input_queue.empty())
break;
input = input_queue.front();
/// Убираем источник из очереди доступных источников.
input_queue.pop();
}
/// Основная работа.
Block block = input.in->read();
{
if (finish)
break;
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
{
std::lock_guard<std::mutex> lock(input_queue_mutex);
if (block)
{
input_queue.push(input);
}
else
{
if (input_queue.empty())
break;
}
}
if (finish)
break;
if (block)
handler.onBlock(block, thread_num);
}
}
}
BlockInputStreams inputs;
unsigned max_threads;
Handler & handler;
/// Потоки.
typedef std::vector<std::thread> ThreadsData;
ThreadsData threads;
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
typedef std::queue<InputData> InputQueue;
InputQueue input_queue;
/// Для операций с input_queue.
std::mutex input_queue_mutex;
/// Сколько источников иссякло.
std::atomic<size_t> active_threads { 0 };
/// Завершить работу потоков (раньше, чем иссякнут источники).
std::atomic<bool> finish { false };
/// Подождали завершения всех потоков.
std::atomic<bool> joined_threads { false };
Logger * log = &Logger::get("ParallelInputsProcessor");
};
}

View File

@ -21,9 +21,6 @@ public:
PushingToViewsBlockOutputStream(String database_, String table_, const Context & context_, ASTPtr query_ptr_) PushingToViewsBlockOutputStream(String database_, String table_, const Context & context_, ASTPtr query_ptr_)
: database(database_), table(table_), context(context_), query_ptr(query_ptr_) : database(database_), table(table_), context(context_), query_ptr(query_ptr_)
{ {
if (database.empty())
database = context.getCurrentDatabase();
storage = context.getTable(database, table); storage = context.getTable(database, table);
addTableLock(storage->lockStructure(true)); addTableLock(storage->lockStructure(true));

View File

@ -29,24 +29,28 @@ private:
public: public:
/// Принимает готовое соединение. /// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_) const Context & context = Context{})
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
{ {
init(settings_); init(settings_);
} }
/// Принимает готовое соединение. Захватывает владение соединением из пула. /// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), external_tables(external_tables_), stage(stage_) const Context & context = Context{})
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_),
external_tables(external_tables_), stage(stage_), context(context)
{ {
init(settings_); init(settings_);
} }
/// Принимает пул, из которого нужно будет достать соединение. /// Принимает пул, из которого нужно будет достать соединение.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_) const Context & context = Context{})
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
{ {
init(settings_); init(settings_);
} }
@ -103,7 +107,8 @@ protected:
{ {
StoragePtr cur = table.second; StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), settings, stage, DEFAULT_BLOCK_SIZE, 1); DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0) if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first)); res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else else
@ -246,6 +251,7 @@ private:
/// Временные таблицы, которые необходимо переслать на удаленные сервера. /// Временные таблицы, которые необходимо переслать на удаленные сервера.
Tables external_tables; Tables external_tables;
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Context context;
/// Отправили запрос (это делается перед получением первого блока). /// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false; bool sent_query = false;

View File

@ -1,14 +1,10 @@
#pragma once #pragma once
#include <list>
#include <queue>
#include <atomic>
#include <thread>
#include <Yandex/logger_useful.h> #include <Yandex/logger_useful.h>
#include <DB/Common/ConcurrentBoundedQueue.h> #include <DB/Common/ConcurrentBoundedQueue.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h> #include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ParallelInputsProcessor.h>
namespace DB namespace DB
@ -23,25 +19,19 @@ using Poco::SharedPtr;
* в которых будет выполняться получение данных из разных источников. * в которых будет выполняться получение данных из разных источников.
* *
* Устроено так: * Устроено так:
* - есть набор источников, из которых можно вынимать блоки; * - с помощью ParallelInputsProcessor в нескольких потоках вынимает из источников блоки;
* - есть набор потоков, которые могут одновременно вынимать блоки из разных источников;
* - "свободные" источники (с которыми сейчас не работает никакой поток) кладутся в очередь источников;
* - когда поток берёт источник для обработки, он удаляет его из очереди источников,
* вынимает из него блок, и затем кладёт источник обратно в очередь источников;
* - полученные блоки складываются в ограниченную очередь готовых блоков; * - полученные блоки складываются в ограниченную очередь готовых блоков;
* - основной поток вынимает готовые блоки из очереди готовых блоков. * - основной поток вынимает готовые блоки из очереди готовых блоков.
*/ */
class UnionBlockInputStream : public IProfilingBlockInputStream class UnionBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) UnionBlockInputStream(BlockInputStreams inputs, unsigned max_threads)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))), : output_queue(max_threads), handler(*this), processor(inputs, max_threads, handler)
output_queue(max_threads)
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children = inputs;
for (size_t i = 0; i < inputs_.size(); ++i)
input_queue.emplace(inputs_[i], i);
} }
String getName() const override { return "UnionBlockInputStream"; } String getName() const override { return "UnionBlockInputStream"; }
@ -77,7 +67,7 @@ public:
} }
catch (...) catch (...)
{ {
LOG_ERROR(log, "Exception while destroying UnionBlockInputStream."); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }
@ -89,44 +79,61 @@ public:
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return; return;
finish = true; //std::cerr << "cancelling\n";
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it) processor.cancel();
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it))
{
try
{
child->cancel();
}
catch (...)
{
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
}
}
} }
protected: protected:
void finalize() void finalize()
{ {
if (threads.empty()) if (!started)
return; return;
LOG_TRACE(log, "Waiting for threads to finish"); LOG_TRACE(log, "Waiting for threads to finish");
/// Вынем всё, что есть в очереди готовых данных. ExceptionPtr exception;
output_queue.clear(); if (!all_read)
{
/** Прочитаем всё до конца, чтобы ParallelInputsProcessor не заблокировался при попытке вставить в очередь.
* Может быть, в очереди есть ещё эксепшен.
*/
OutputData res;
while (true)
{
//std::cerr << "popping\n";
output_queue.pop(res);
/// В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. if (res.exception)
for (auto & thread : threads) {
thread.join(); if (!exception)
exception = res.exception;
else if (DB::Exception * e = dynamic_cast<DB::Exception *>(&*exception))
e->addMessage("\n" + res.exception->displayText());
}
else if (!res.block)
break;
}
threads.clear(); all_read = true;
}
processor.wait();
LOG_TRACE(log, "Waited for threads to finish"); LOG_TRACE(log, "Waited for threads to finish");
if (exception)
exception->rethrow();
} }
/** Возможны следующие варианты:
* 1. Функция readImpl вызывается до тех пор, пока она не вернёт пустой блок.
* Затем вызывается функция readSuffix и затем деструктор.
* 2. Вызывается функция readImpl. В какой-то момент, возможно из другого потока вызывается функция cancel.
* Затем вызывается функция readSuffix и затем деструктор.
* 3. В любой момент, объект может быть и так уничтожен (вызываться деструктор).
*/
Block readImpl() override Block readImpl() override
{ {
OutputData res; OutputData res;
@ -134,14 +141,14 @@ protected:
return res.block; return res.block;
/// Запускаем потоки, если это ещё не было сделано. /// Запускаем потоки, если это ещё не было сделано.
if (threads.empty()) if (!started)
{ {
threads.reserve(max_threads); started = true;
for (size_t i = 0; i < max_threads; ++i) processor.process();
threads.emplace_back([=] { thread(current_memory_tracker); });
} }
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение. /// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
//std::cerr << "popping\n";
output_queue.pop(res); output_queue.pop(res);
if (res.exception) if (res.exception)
@ -153,17 +160,13 @@ protected:
return res.block; return res.block;
} }
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
void readSuffix() override void readSuffix() override
{ {
//std::cerr << "readSuffix\n";
if (!all_read && !is_cancelled) if (!all_read && !is_cancelled)
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR); throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
/// Может быть, в очереди есть ещё эксепшен.
OutputData res;
while (output_queue.tryPop(res))
if (res.exception)
res.exception->rethrow();
finalize(); finalize();
for (size_t i = 0; i < children.size(); ++i) for (size_t i = 0; i < children.size(); ++i)
@ -171,118 +174,6 @@ protected:
} }
private: private:
/// Данные отдельного источника
struct InputData
{
BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки).
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
};
void thread(MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
ExceptionPtr exception;
try
{
loop();
}
catch (...)
{
exception = cloneCurrentException();
}
if (exception)
{
/// Отдаём эксепшен в основной поток.
output_queue.push(exception);
try
{
cancel();
}
catch (...)
{
/** Если не удалось попросить остановиться одного или несколько источников.
* (например, разорвано соединение при распределённой обработке запроса)
* - то пофиг.
*/
}
}
}
void loop()
{
while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{
InputData input;
/// Выбираем следующий источник.
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (input_queue.empty())
break;
input = input_queue.front();
/// Убираем источник из очереди доступных источников.
input_queue.pop();
}
/// Основная работа.
Block block = input.in->read();
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
if (block)
{
input_queue.push(input);
if (finish)
break;
output_queue.push(block);
}
else
{
++exhausted_inputs;
/// Если все источники иссякли.
if (exhausted_inputs == children.size())
{
finish = true;
break;
}
}
}
}
if (finish)
{
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет; только один раз.
if (false == pushed_end_of_output_queue.exchange(true))
output_queue.push(OutputData());
}
}
unsigned max_threads;
/// Потоки.
typedef std::vector<std::thread> ThreadsData;
ThreadsData threads;
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
typedef std::queue<InputData> InputQueue;
InputQueue input_queue;
/// Блок или эксепшен. /// Блок или эксепшен.
struct OutputData struct OutputData
{ {
@ -294,22 +185,52 @@ private:
OutputData(ExceptionPtr & exception_) : exception(exception_) {} OutputData(ExceptionPtr & exception_) : exception(exception_) {}
}; };
/// Очередь готовых блоков. Также туда можно положить эксепшен вместо блока. /** Очередь готовых блоков. Также туда можно положить эксепшен вместо блока.
* Когда данные закончатся - в очередь вставляется пустой блок.
* В очередь всегда (даже после исключения или отмены запроса) рано или поздно вставляется пустой блок.
* Очередь всегда (даже после исключения или отмены запроса, даже в деструкторе) нужно дочитывать до пустого блока,
* иначе ParallelInputsProcessor может заблокироваться при вставке в очередь.
*/
typedef ConcurrentBoundedQueue<OutputData> OutputQueue; typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue; OutputQueue output_queue;
/// Для операций с input_queue.
Poco::FastMutex mutex;
/// Сколько источников иссякло. struct Handler
size_t exhausted_inputs = 0; {
Handler(UnionBlockInputStream & parent_) : parent(parent_) {}
/// Завершить работу потоков (раньше, чем иссякнут источники). void onBlock(Block & block, size_t thread_num)
std::atomic<bool> finish { false }; {
/// Положили ли в output_queue пустой блок. //std::cerr << "pushing block\n";
std::atomic<bool> pushed_end_of_output_queue { false }; parent.output_queue.push(block);
}
bool all_read { false }; void onFinish()
{
//std::cerr << "pushing end\n";
parent.output_queue.push(OutputData());
}
void onException(ExceptionPtr & exception, size_t thread_num)
{
//std::cerr << "pushing exception\n";
/// Порядок строк имеет значение. Если его поменять, то возможна ситуация,
/// когда перед эксепшеном, в очередь окажется вставлен пустой блок (конец данных),
/// и эксепшен потеряется.
parent.output_queue.push(exception);
parent.cancel(); /// Не кидает исключений.
}
UnionBlockInputStream & parent;
};
Handler handler;
ParallelInputsProcessor<Handler> processor;
bool started = false;
bool all_read = false;
Logger * log = &Logger::get("UnionBlockInputStream"); Logger * log = &Logger::get("UnionBlockInputStream");
}; };

View File

@ -29,7 +29,7 @@ public:
{ {
return "FixedString(" + toString(n) + ")"; return "FixedString(" + toString(n) + ")";
} }
DataTypePtr clone() const DataTypePtr clone() const
{ {
return new DataTypeFixedString(n); return new DataTypeFixedString(n);

View File

@ -22,7 +22,7 @@ public:
{ {
return "String"; return "String";
} }
DataTypePtr clone() const DataTypePtr clone() const
{ {
return new DataTypeString; return new DataTypeString;

View File

@ -30,7 +30,7 @@ public:
/// Если тип числовой, уместны ли с ним все арифметические операции и приведение типов. /// Если тип числовой, уместны ли с ним все арифметические операции и приведение типов.
/// true для чисел, false для даты и даты-с-временем. /// true для чисел, false для даты и даты-с-временем.
virtual bool behavesAsNumber() const { return false; } virtual bool behavesAsNumber() const { return false; }
/// Клонировать /// Клонировать
virtual SharedPtr<IDataType> clone() const = 0; virtual SharedPtr<IDataType> clone() const = 0;

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Poco/SharedPtr.h>
#include <DB/Functions/IFunction.h> #include <DB/Functions/IFunction.h>
#include <Yandex/singleton.h>
namespace DB namespace DB
@ -14,7 +14,7 @@ class Context;
* Функция при создании также может использовать для инициализации (например, захватить SharedPtr) * Функция при создании также может использовать для инициализации (например, захватить SharedPtr)
* какие-нибудь справочники, находящиеся в Context-е. * какие-нибудь справочники, находящиеся в Context-е.
*/ */
class FunctionFactory class FunctionFactory : public Singleton<FunctionFactory>
{ {
private: private:
typedef IFunction* (*Creator)(const Context & context); /// Не std::function, так как меньше indirection и размер объекта. typedef IFunction* (*Creator)(const Context & context); /// Не std::function, так как меньше indirection и размер объекта.

View File

@ -736,7 +736,7 @@ public:
/// Выполнить функцию над блоком. /// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result) void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{ {
RegionsNames::SupportedLanguages::Enum language = RegionsNames::SupportedLanguages::RU; RegionsNames::Language language = RegionsNames::Language::RU;
/// Если указан язык результата /// Если указан язык результата
if (arguments.size() == 2) if (arguments.size() == 2)

View File

@ -0,0 +1,28 @@
#pragma once
#include <src/double-conversion.h>
namespace DB
{
template <bool emit_decimal_point> struct DoubleToStringConverterFlags
{
static constexpr auto flags = double_conversion::DoubleToStringConverter::NO_FLAGS;
};
template <> struct DoubleToStringConverterFlags<true>
{
static constexpr auto flags = double_conversion::DoubleToStringConverter::EMIT_TRAILING_DECIMAL_POINT;
};
template <bool emit_decimal_point = true>
const double_conversion::DoubleToStringConverter & getDoubleToStringConverter()
{
static const double_conversion::DoubleToStringConverter instance{
DoubleToStringConverterFlags<emit_decimal_point>::flags, "inf", "nan", 'e', -6, 21, 6, 1
};
return instance;
}
}

View File

@ -12,6 +12,7 @@
#include <Yandex/logger_useful.h> #include <Yandex/logger_useful.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
namespace DB namespace DB
@ -36,8 +37,10 @@ public:
const std::string & host_, const std::string & host_,
int port_, int port_,
const Params & params, const Params & params,
size_t timeout_ = 0, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0),
const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0),
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0))
: ReadBuffer(nullptr, 0), host(host_), port(port_) : ReadBuffer(nullptr, 0), host(host_), port(port_)
{ {
std::stringstream uri; std::stringstream uri;
@ -59,7 +62,7 @@ public:
session.setPort(port); session.setPort(port);
/// устанавливаем таймаут /// устанавливаем таймаут
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0)); session.setTimeout(connection_timeout, send_timeout, receive_timeout);
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str()); Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response; Poco::Net::HTTPResponse response;

View File

@ -3,6 +3,9 @@
#include <DB/IO/ReadBufferFromHTTP.h> #include <DB/IO/ReadBufferFromHTTP.h>
#include "ReadHelpers.h" #include "ReadHelpers.h"
#define DEFAULT_REMOTE_READ_BUFFER_CONNECTION_TIMEOUT 1
#define DEFAULT_REMOTE_READ_BUFFER_RECEIVE_TIMEOUT 1800
#define DEFAULT_REMOTE_READ_BUFFER_SEND_TIMEOUT 1800
namespace DB namespace DB
{ {
@ -20,8 +23,10 @@ public:
int port, int port,
const std::string & path, const std::string & path,
bool compress = true, bool compress = true,
size_t timeout = 0, size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE) const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_CONNECTION_TIMEOUT, 0),
const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_SEND_TIMEOUT, 0),
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_RECEIVE_TIMEOUT, 0))
: ReadBuffer(nullptr, 0) : ReadBuffer(nullptr, 0)
{ {
ReadBufferFromHTTP::Params params = { ReadBufferFromHTTP::Params params = {
@ -29,7 +34,7 @@ public:
std::make_pair("path", path), std::make_pair("path", path),
std::make_pair("compress", (compress ? "true" : "false"))}; std::make_pair("compress", (compress ? "true" : "false"))};
impl = new ReadBufferFromHTTP(host, port, params, timeout, buffer_size); impl = new ReadBufferFromHTTP(host, port, params, buffer_size, connection_timeout, send_timeout, receive_timeout);
} }
bool nextImpl() bool nextImpl()

View File

@ -13,7 +13,9 @@
#include <Yandex/logger_useful.h> #include <Yandex/logger_useful.h>
#define DEFAULT_REMOTE_WRITE_BUFFER_TIMEOUT 1800 #define DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT 1
#define DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT 1800
#define DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT 1800
namespace DB namespace DB
@ -49,8 +51,12 @@ public:
*/ */
RemoteWriteBuffer(const std::string & host_, int port_, const std::string & path_, RemoteWriteBuffer(const std::string & host_, int port_, const std::string & path_,
const std::string & tmp_path_ = "", const std::string & if_exists_ = "remove", const std::string & tmp_path_ = "", const std::string & if_exists_ = "remove",
bool decompress_ = false, size_t timeout_ = 0, unsigned connection_retries_ = 3, bool decompress_ = false,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) unsigned connection_retries_ = 3,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_CONNECTION_TIMEOUT, 0),
const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_SEND_TIMEOUT, 0),
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_WRITE_BUFFER_RECEIVE_TIMEOUT, 0))
: WriteBuffer(nullptr, 0), host(host_), port(port_), path(path_), : WriteBuffer(nullptr, 0), host(host_), port(port_), path(path_),
tmp_path(tmp_path_), if_exists(if_exists_), tmp_path(tmp_path_), if_exists(if_exists_),
decompress(decompress_), connection_retries(connection_retries_), finalized(false) decompress(decompress_), connection_retries(connection_retries_), finalized(false)
@ -72,7 +78,7 @@ public:
session.setKeepAlive(true); session.setKeepAlive(true);
/// устанавливаем таймаут /// устанавливаем таймаут
session.setTimeout(Poco::Timespan((timeout_ ? timeout_ : DEFAULT_REMOTE_WRITE_BUFFER_TIMEOUT), 0)); session.setTimeout(connection_timeout, send_timeout, receive_timeout);
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri_str, Poco::Net::HTTPRequest::HTTP_1_1); Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri_str, Poco::Net::HTTPRequest::HTTP_1_1);

View File

@ -9,6 +9,7 @@
#include <Yandex/DateLUT.h> #include <Yandex/DateLUT.h>
#include <mysqlxx/Row.h> #include <mysqlxx/Row.h>
#include <mysqlxx/Null.h>
#include <DB/Core/Types.h> #include <DB/Core/Types.h>
#include <DB/Core/Exception.h> #include <DB/Core/Exception.h>
@ -18,10 +19,9 @@
#include <DB/IO/WriteIntText.h> #include <DB/IO/WriteIntText.h>
#include <DB/IO/VarInt.h> #include <DB/IO/VarInt.h>
#include <DB/IO/WriteBufferFromString.h> #include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/DoubleConverter.h>
#include <city.h> #include <city.h>
#define WRITE_HELPERS_DEFAULT_FLOAT_PRECISION 6U
namespace DB namespace DB
{ {
@ -86,15 +86,17 @@ inline void writeBoolText(bool x, WriteBuffer & buf)
template <typename T> template <typename T>
void writeFloatText(T x, WriteBuffer & buf, unsigned precision = WRITE_HELPERS_DEFAULT_FLOAT_PRECISION) void writeFloatText(T x, WriteBuffer & buf)
{ {
char tmp[24]; char tmp[25];
int res = std::snprintf(tmp, 24, "%.*g", precision, x); double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
if (res >= 24 || res <= 0) const auto result = getDoubleToStringConverter<false>().ToShortest(x, &builder);
if (!result)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
buf.write(tmp, res); buf.write(tmp, builder.position());
} }
@ -356,9 +358,9 @@ inline void writeDateText(mysqlxx::Date date, WriteBuffer & buf)
/// в формате YYYY-MM-DD HH:MM:SS, согласно текущему часовому поясу /// в формате YYYY-MM-DD HH:MM:SS, согласно текущему часовому поясу
inline void writeDateTimeText(time_t datetime, WriteBuffer & buf) inline void writeDateTimeText(time_t datetime, WriteBuffer & buf, char date_delimeter = '-', char time_delimeter = ':')
{ {
char s[19] = {'0', '0', '0', '0', '-', '0', '0', '-', '0', '0', ' ', '0', '0', ':', '0', '0', ':', '0', '0'}; char s[19] = {'0', '0', '0', '0', date_delimeter, '0', '0', date_delimeter, '0', '0', ' ', '0', '0', time_delimeter, '0', '0', time_delimeter, '0', '0'};
if (unlikely(datetime > DATE_LUT_MAX || datetime == 0)) if (unlikely(datetime > DATE_LUT_MAX || datetime == 0))
{ {
@ -392,9 +394,9 @@ inline void writeDateTimeText(time_t datetime, WriteBuffer & buf)
buf.write(s, 19); buf.write(s, 19);
} }
inline void writeDateTimeText(mysqlxx::DateTime datetime, WriteBuffer & buf) inline void writeDateTimeText(mysqlxx::DateTime datetime, WriteBuffer & buf, char date_delimeter = '-', char time_delimeter = ':')
{ {
char s[19] = {'0', '0', '0', '0', '-', '0', '0', '-', '0', '0', ' ', '0', '0', ':', '0', '0', ':', '0', '0'}; char s[19] = {'0', '0', '0', '0', date_delimeter, '0', '0', date_delimeter, '0', '0', ' ', '0', '0', time_delimeter, '0', '0', time_delimeter, '0', '0'};
s[0] += datetime.year() / 1000; s[0] += datetime.year() / 1000;
s[1] += (datetime.year() / 100) % 10; s[1] += (datetime.year() / 100) % 10;
@ -473,6 +475,15 @@ inline void writeText(const VisitID_t & x, WriteBuffer & buf) { writeIntText(st
inline void writeText(const mysqlxx::Date & x, WriteBuffer & buf) { writeDateText(x, buf); } inline void writeText(const mysqlxx::Date & x, WriteBuffer & buf) { writeDateText(x, buf); }
inline void writeText(const mysqlxx::DateTime & x, WriteBuffer & buf) { writeDateTimeText(x, buf); } inline void writeText(const mysqlxx::DateTime & x, WriteBuffer & buf) { writeDateTimeText(x, buf); }
template<typename T>
inline void writeText(const mysqlxx::Null<T> & x, WriteBuffer & buf)
{
if (x.isNull())
writeCString("\\N", buf);
else
writeText(static_cast<const T &>(x), buf);
}
/// Методы для вывода в текстовом виде в кавычках /// Методы для вывода в текстовом виде в кавычках
inline void writeQuoted(const UInt8 & x, WriteBuffer & buf) { writeIntText(x, buf); } inline void writeQuoted(const UInt8 & x, WriteBuffer & buf) { writeIntText(x, buf); }
@ -507,6 +518,15 @@ inline void writeQuoted(const mysqlxx::DateTime & x, WriteBuffer & buf)
writeChar('\'', buf); writeChar('\'', buf);
} }
template <typename T>
inline void writeQuoted(const mysqlxx::Null<T> & x, WriteBuffer & buf)
{
if (x.isNull())
writeCString("NULL", buf);
else
writeText(static_cast<const T &>(x), buf);
}
/// В двойных кавычках /// В двойных кавычках
inline void writeDoubleQuoted(const UInt8 & x, WriteBuffer & buf) { writeIntText(x, buf); } inline void writeDoubleQuoted(const UInt8 & x, WriteBuffer & buf) { writeIntText(x, buf); }

View File

@ -14,7 +14,6 @@
#include <DB/Storages/MarkCache.h> #include <DB/Storages/MarkCache.h>
#include <DB/DataStreams/FormatFactory.h> #include <DB/DataStreams/FormatFactory.h>
#include <DB/Storages/IStorage.h> #include <DB/Storages/IStorage.h>
#include <DB/Functions/FunctionFactory.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h> #include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h> #include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h> #include <DB/Storages/StorageFactory.h>
@ -83,7 +82,6 @@ struct ContextShared
String path; /// Путь к директории с данными, со слешем на конце. String path; /// Путь к директории с данными, со слешем на конце.
Databases databases; /// Список БД и таблиц в них. Databases databases; /// Список БД и таблиц в них.
TableFunctionFactory table_function_factory; /// Табличные функции. TableFunctionFactory table_function_factory; /// Табличные функции.
FunctionFactory function_factory; /// Обычные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции. AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
DataTypeFactory data_type_factory; /// Типы данных. DataTypeFactory data_type_factory; /// Типы данных.
StorageFactory storage_factory; /// Движки таблиц. StorageFactory storage_factory; /// Движки таблиц.
@ -254,7 +252,6 @@ public:
void setSetting(const String & name, const std::string & value); void setSetting(const String & name, const std::string & value);
const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; } const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; }
const FunctionFactory & getFunctionFactory() const { return shared->function_factory; }
const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; } const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; } const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const StorageFactory & getStorageFactory() const { return shared->storage_factory; } const StorageFactory & getStorageFactory() const { return shared->storage_factory; }

View File

@ -10,114 +10,129 @@
namespace DB namespace DB
{ {
/** Интерпретирует запрос SELECT. Возвращает поток блоков с результатами выполнения запроса до стадии to_stage. /** Интерпретирует запрос SELECT. Возвращает поток блоков с результатами выполнения запроса до стадии to_stage.
*/ */
class InterpreterSelectQuery class InterpreterSelectQuery
{ {
public: public:
/** to_stage /** to_stage
* - стадия, до которой нужно выполнить запрос. По-умолчанию - до конца. * - стадия, до которой нужно выполнить запрос. По-умолчанию - до конца.
* Можно выполнить до промежуточного состояния агрегации, которые объединяются с разных серверов при распределённой обработке запроса. * Можно выполнить до промежуточного состояния агрегации, которые объединяются с разных серверов при распределённой обработке запроса.
* *
* subquery_depth * subquery_depth
* - для контроля ограничений на глубину вложенности подзапросов. Для подзапросов передаётся значение, увеличенное на единицу. * - для контроля ограничений на глубину вложенности подзапросов. Для подзапросов передаётся значение, увеличенное на единицу.
* *
* input * input
* - если задан - читать не из таблицы, указанной в запросе, а из готового источника. * - если задан - читать не из таблицы, указанной в запросе, а из готового источника.
* *
* required_column_names * required_column_names
* - удалить из запроса все столбцы кроме указанных - используется для удаления ненужных столбцов из подзапросов. * - удалить из запроса все столбцы кроме указанных - используется для удаления ненужных столбцов из подзапросов.
* *
* table_column_names * table_column_names
* - поместить в контекст в качестве известных столбцов только указанные столбцы, а не все столбцы таблицы. * - поместить в контекст в качестве известных столбцов только указанные столбцы, а не все столбцы таблицы.
* Используется, например, совместно с указанием input. * Используется, например, совместно с указанием input.
*/ */
InterpreterSelectQuery( InterpreterSelectQuery(
ASTPtr query_ptr_, ASTPtr query_ptr_,
const Context & context_, const Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0, size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr); BlockInputStreamPtr input = nullptr,
bool is_union_all_head_ = true);
InterpreterSelectQuery( InterpreterSelectQuery(
ASTPtr query_ptr_, ASTPtr query_ptr_,
const Context & context_, const Context & context_,
const Names & required_column_names, const Names & required_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0, size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr); BlockInputStreamPtr input = nullptr);
InterpreterSelectQuery( InterpreterSelectQuery(
ASTPtr query_ptr_, ASTPtr query_ptr_,
const Context & context_, const Context & context_,
const Names & required_column_names, const Names & required_column_names,
const NamesAndTypesList & table_column_names, const NamesAndTypesList & table_column_names,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete, QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0, size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr); BlockInputStreamPtr input = nullptr);
/// Выполнить запрос, получить поток блоков для чтения /** Выполнить запрос, возможно являющиийся цепочкой UNION ALL.
BlockInputStreamPtr execute(); * Получить поток блоков для чтения
*/
BlockInputStreamPtr execute();
/** Выполнить запрос, записать результат в нужном формате в buf. /** Выполнить запрос, записать результат в нужном формате в buf.
* BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса. * BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса.
*/ */
BlockInputStreamPtr executeAndFormat(WriteBuffer & buf); BlockInputStreamPtr executeAndFormat(WriteBuffer & buf);
DataTypes getReturnTypes(); DataTypes getReturnTypes();
Block getSampleBlock(); Block getSampleBlock();
private: private:
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr; typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList()); void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList());
/** Из какой таблицы читать. JOIN-ы не поддерживаются. /// Выполнить один запрос SELECT из цепочки UNION ALL.
*/ void executeSingleQuery(bool should_perform_union_hint = true);
void getDatabaseAndTableNames(String & database_name, String & table_name);
/** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера. /// Является ли это первым запросом цепочки UNION ALL имеющей длниу >= 2.
*/ bool isFirstSelectInsideUnionAll() const;
String getAnyColumn();
/// Разные стадии выполнения запроса. /** Из какой таблицы читать. JOIN-ы не поддерживаются.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage. /** Выбрать из списка столбцов какой-нибудь, лучше - минимального размера.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams); */
String getAnyColumn();
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression); /// Разные стадии выполнения запроса.
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final);
void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final);
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression,
bool overflow_row);
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOrder( BlockInputStreams & streams);
void executePreLimit( BlockInputStreams & streams);
void executeUnion( BlockInputStreams & streams);
void executeLimit( BlockInputStreams & streams);
void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets);
ASTPtr query_ptr; /// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
ASTSelectQuery & query; QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
Context context;
Settings settings;
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
ExpressionAnalyzerPtr query_analyzer;
BlockInputStreams streams;
/// Таблица, откуда читать данные, если не подзапрос. void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
StoragePtr storage; void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
IStorage::TableStructureReadLockPtr table_lock; bool overflow_row, bool final);
void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final);
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression,
bool overflow_row);
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOrder( BlockInputStreams & streams);
void executePreLimit( BlockInputStreams & streams);
void executeUnion( BlockInputStreams & streams);
void executeLimit( BlockInputStreams & streams);
void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets);
Logger * log; ASTPtr query_ptr;
ASTSelectQuery & query;
Context context;
Settings settings;
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
ExpressionAnalyzerPtr query_analyzer;
BlockInputStreams streams;
/** Цепочка UNION ALL может иметь длину 1 (в таком случае имеется просто один запрос SELECT)
* или больше. Этот флаг установлен, если это первый запрос, возможно единственный, этой цепочки.
*/
bool is_union_all_head;
/// Следующий запрос SELECT в цепочке UNION ALL.
std::unique_ptr<InterpreterSelectQuery> next_select_in_union_all;
/// Таблица, откуда читать данные, если не подзапрос.
StoragePtr storage;
IStorage::TableStructureReadLockPtr table_lock;
Logger * log;
}; };
} }

View File

@ -81,7 +81,7 @@ struct Settings
\ \
M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \ M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \
\ \
M(SettingTotalsMode, totals_mode, TotalsMode::BEFORE_HAVING) \ M(SettingTotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE) \
M(SettingFloat, totals_auto_threshold, 0.5) \ M(SettingFloat, totals_auto_threshold, 0.5) \
\ \
/** Сэмплирование по умолчанию. Если равно 1, то отключено. */ \ /** Сэмплирование по умолчанию. Если равно 1, то отключено. */ \

View File

@ -15,7 +15,7 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit
* Сортировка стабильная. Это важно для сохранения порядка записей в движке CollapsingMergeTree * Сортировка стабильная. Это важно для сохранения порядка записей в движке CollapsingMergeTree
* - так как на основе порядка записей определяется, удалять ли или оставлять группы строчек при коллапсировании. * - так как на основе порядка записей определяется, удалять ли или оставлять группы строчек при коллапсировании.
* Не поддерживаются collations. Не поддерживается частичная сортировка. * Не поддерживаются collations. Не поддерживается частичная сортировка.
*/ */
void stableSortBlock(Block & block, const SortDescription & description); void stableSortBlock(Block & block, const SortDescription & description);
} }

View File

@ -87,9 +87,9 @@ public:
ASTAlterQuery(StringRange range_ = StringRange()) : IAST(range_) {}; ASTAlterQuery(StringRange range_ = StringRange()) : IAST(range_) {};
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return ("AlterQuery_" + database + "_" + table); }; String getID() const override { return ("AlterQuery_" + database + "_" + table); };
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTAlterQuery * res = new ASTAlterQuery(*this); ASTAlterQuery * res = new ASTAlterQuery(*this);
for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i) for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i)

View File

@ -11,11 +11,11 @@ namespace DB
class ASTAsterisk : public IAST class ASTAsterisk : public IAST
{ {
public: public:
ASTAsterisk() {} ASTAsterisk() = default;
ASTAsterisk(StringRange range_) : IAST(range_) {} ASTAsterisk(StringRange range_) : IAST(range_) {}
String getID() const { return "Asterisk"; } String getID() const override { return "Asterisk"; }
ASTPtr clone() const { return new ASTAsterisk(*this); } ASTPtr clone() const override { return new ASTAsterisk(*this); }
String getColumnName() const { return "*"; } String getColumnName() const override { return "*"; }
}; };
} }

View File

@ -8,12 +8,11 @@ namespace DB
struct ASTCheckQuery : public IAST struct ASTCheckQuery : public IAST
{ {
ASTCheckQuery(StringRange range_ = StringRange()) : IAST(range_) {}; ASTCheckQuery(StringRange range_ = StringRange()) : IAST(range_) {};
ASTCheckQuery(const ASTCheckQuery & ast) = default;
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return ("CheckQuery_" + database + "_" + table); }; String getID() const override { return ("CheckQuery_" + database + "_" + table); };
ASTPtr clone() const ASTPtr clone() const override
{ {
return new ASTCheckQuery(*this); return new ASTCheckQuery(*this);
} }

View File

@ -17,13 +17,15 @@ public:
ASTPtr default_expression; ASTPtr default_expression;
ASTColumnDeclaration() = default; ASTColumnDeclaration() = default;
ASTColumnDeclaration(StringRange range) : IAST{range} {} ASTColumnDeclaration(const StringRange range) : IAST{range} {}
String getID() const { return "ColumnDeclaration_" + name; } String getID() const override { return "ColumnDeclaration_" + name; }
ASTPtr clone() const ASTPtr clone() const override
{ {
const auto res = new ASTColumnDeclaration{*this}; const auto res = new ASTColumnDeclaration{*this};
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (type) { if (type) {
@ -36,7 +38,7 @@ public:
res->children.push_back(res->default_expression); res->children.push_back(res->default_expression);
} }
return res; return ptr;
} }
}; };

View File

@ -13,12 +13,12 @@ namespace DB
class ASTCreateQuery : public IAST class ASTCreateQuery : public IAST
{ {
public: public:
bool attach; /// Запрос ATTACH TABLE, а не CREATE TABLE. bool attach{false}; /// Запрос ATTACH TABLE, а не CREATE TABLE.
bool if_not_exists; bool if_not_exists{false};
bool is_view; bool is_view{false};
bool is_materialized_view; bool is_materialized_view{false};
bool is_populate; bool is_populate{false};
bool is_temporary; bool is_temporary{false};
String database; String database;
String table; String table;
ASTPtr columns; ASTPtr columns;
@ -28,15 +28,17 @@ public:
String as_table; String as_table;
ASTPtr select; ASTPtr select;
ASTCreateQuery() : attach(false), if_not_exists(false), is_view(false), is_materialized_view(false), is_populate(false), is_temporary(false) {} ASTCreateQuery() = default;
ASTCreateQuery(StringRange range_) : IAST(range_), attach(false), if_not_exists(false), is_view(false), is_materialized_view(false), is_populate(false), is_temporary(false) {} ASTCreateQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return (attach ? "AttachQuery_" : "CreateQuery_") + database + "_" + table; }; String getID() const override { return (attach ? "AttachQuery_" : "CreateQuery_") + database + "_" + table; };
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTCreateQuery * res = new ASTCreateQuery(*this); ASTCreateQuery * res = new ASTCreateQuery(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
@ -44,7 +46,7 @@ public:
if (select) { res->select = select->clone(); res->children.push_back(res->select); } if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (inner_storage) { res->inner_storage = inner_storage->clone(); res->children.push_back(res->inner_storage); } if (inner_storage) { res->inner_storage = inner_storage->clone(); res->children.push_back(res->inner_storage); }
return res; return ptr;
} }
}; };

View File

@ -12,18 +12,18 @@ namespace DB
class ASTDropQuery : public IAST class ASTDropQuery : public IAST
{ {
public: public:
bool detach; /// Запрос DETACH, а не DROP. bool detach{false}; /// Запрос DETACH, а не DROP.
bool if_exists; bool if_exists{false};
String database; String database;
String table; String table;
ASTDropQuery() : detach(false), if_exists(false) {} ASTDropQuery() = default;
ASTDropQuery(StringRange range_) : IAST(range_), detach(false), if_exists(false) {} ASTDropQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return (detach ? "DetachQuery_" : "DropQuery_") + database + "_" + table; }; String getID() const override { return (detach ? "DetachQuery_" : "DropQuery_") + database + "_" + table; };
ASTPtr clone() const { return new ASTDropQuery(*this); } ASTPtr clone() const override { return new ASTDropQuery(*this); }
}; };
} }

View File

@ -14,21 +14,22 @@ using Poco::SharedPtr;
class ASTExpressionList : public IAST class ASTExpressionList : public IAST
{ {
public: public:
ASTExpressionList() {} ASTExpressionList() = default;
ASTExpressionList(StringRange range_) : IAST(range_) {} ASTExpressionList(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "ExpressionList"; } String getID() const override { return "ExpressionList"; }
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTExpressionList * res = new ASTExpressionList(*this); const auto res = new ASTExpressionList(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it) for (const auto & child : children)
res->children.push_back((*it)->clone()); res->children.emplace_back(child->clone());
return res; return ptr;
} }
}; };

View File

@ -32,12 +32,12 @@ public:
/// параметры - для параметрических агрегатных функций. Пример: quantile(0.9)(x) - то, что в первых скобках - параметры. /// параметры - для параметрических агрегатных функций. Пример: quantile(0.9)(x) - то, что в первых скобках - параметры.
ASTPtr parameters; ASTPtr parameters;
FunctionKind kind; FunctionKind kind{UNKNOWN};
ASTFunction() : kind(UNKNOWN) {} ASTFunction() = default;
ASTFunction(StringRange range_) : ASTWithAlias(range_), kind(UNKNOWN) {} ASTFunction(const StringRange range_) : ASTWithAlias(range_) {}
String getColumnName() const String getColumnName() const override
{ {
String res; String res;
WriteBufferFromString wb(res); WriteBufferFromString wb(res);
@ -68,17 +68,19 @@ public:
} }
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Function_" + name; } String getID() const override { return "Function_" + name; }
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTFunction * res = new ASTFunction(*this); ASTFunction * res = new ASTFunction(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (arguments) { res->arguments = arguments->clone(); res->children.push_back(res->arguments); } if (arguments) { res->arguments = arguments->clone(); res->children.push_back(res->arguments); }
if (parameters) { res->parameters = parameters->clone(); res->children.push_back(res->parameters); } if (parameters) { res->parameters = parameters->clone(); res->children.push_back(res->parameters); }
return res; return ptr;
} }
}; };

View File

@ -26,17 +26,18 @@ public:
/// чего идентифицирует этот идентификатор /// чего идентифицирует этот идентификатор
Kind kind; Kind kind;
ASTIdentifier() {} ASTIdentifier() = default;
ASTIdentifier(StringRange range_, const String & name_, Kind kind_ = Column) : ASTWithAlias(range_), name(name_), kind(kind_) {} ASTIdentifier(const StringRange range_, const String & name_, const Kind kind_ = Column)
: ASTWithAlias(range_), name(name_), kind(kind_) {}
String getColumnName() const { return name; } String getColumnName() const override { return name; }
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Identifier_" + name; } String getID() const override { return "Identifier_" + name; }
ASTPtr clone() const { return new ASTIdentifier(*this); } ASTPtr clone() const override { return new ASTIdentifier(*this); }
void collectIdentifierNames(IdentifierNameSet & set) const void collectIdentifierNames(IdentifierNameSet & set) const override
{ {
set.insert(name); set.insert(name);
} }

View File

@ -24,21 +24,23 @@ public:
const char * data = nullptr; const char * data = nullptr;
const char * end = nullptr; const char * end = nullptr;
ASTInsertQuery() {} ASTInsertQuery() = default;
ASTInsertQuery(StringRange range_) : IAST(range_) {} ASTInsertQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "InsertQuery_" + database + "_" + table; }; String getID() const override { return "InsertQuery_" + database + "_" + table; };
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTInsertQuery * res = new ASTInsertQuery(*this); ASTInsertQuery * res = new ASTInsertQuery(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); } if (select) { res->select = select->clone(); res->children.push_back(res->select); }
return res; return ptr;
} }
}; };

View File

@ -40,11 +40,11 @@ public:
ASTPtr table; /// "Правая" таблица для соединения - подзапрос или имя таблицы. ASTPtr table; /// "Правая" таблица для соединения - подзапрос или имя таблицы.
ASTPtr using_expr_list; /// По каким столбцам выполнять соединение. ASTPtr using_expr_list; /// По каким столбцам выполнять соединение.
ASTJoin() {} ASTJoin() = default;
ASTJoin(StringRange range_) : IAST(range_) {} ASTJoin(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const String getID() const override
{ {
String res; String res;
{ {
@ -57,19 +57,21 @@ public:
writeString(kind == Inner ? "Inner" : "Left", wb); writeString(kind == Inner ? "Inner" : "Left", wb);
writeString("Join", wb); writeString("Join", wb);
} }
return res;
return res;
}; };
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTJoin * res = new ASTJoin(*this); ASTJoin * res = new ASTJoin(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (table) { res->table = table->clone(); res->children.push_back(res->table); } if (table) { res->table = table->clone(); res->children.push_back(res->table); }
if (using_expr_list) { res->using_expr_list = using_expr_list->clone(); res->children.push_back(res->using_expr_list); } if (using_expr_list) { res->using_expr_list = using_expr_list->clone(); res->children.push_back(res->using_expr_list); }
return res; return ptr;
} }
}; };

View File

@ -17,15 +17,15 @@ public:
/// тип /// тип
DataTypePtr type; DataTypePtr type;
ASTLiteral() {} ASTLiteral() = default;
ASTLiteral(StringRange range_, const Field & value_) : ASTWithAlias(range_), value(value_) {} ASTLiteral(const StringRange range_, const Field & value_) : ASTWithAlias(range_), value(value_) {}
String getColumnName() const { return apply_visitor(FieldVisitorToString(), value); } String getColumnName() const override { return apply_visitor(FieldVisitorToString(), value); }
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Literal_" + apply_visitor(FieldVisitorDump(), value); } String getID() const override { return "Literal_" + apply_visitor(FieldVisitorDump(), value); }
ASTPtr clone() const { return new ASTLiteral(*this); } ASTPtr clone() const override { return new ASTLiteral(*this); }
}; };
} }

View File

@ -17,20 +17,22 @@ public:
/// тип /// тип
ASTPtr type; ASTPtr type;
ASTNameTypePair() {} ASTNameTypePair() = default;
ASTNameTypePair(StringRange range_) : IAST(range_) {} ASTNameTypePair(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "NameTypePair_" + name; } String getID() const override { return "NameTypePair_" + name; }
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTNameTypePair * res = new ASTNameTypePair(*this); ASTNameTypePair * res = new ASTNameTypePair(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (type) { res->type = type->clone(); res->children.push_back(res->type); } if (type) { res->type = type->clone(); res->children.push_back(res->type); }
return res; return ptr;
} }
}; };

View File

@ -15,13 +15,13 @@ public:
String database; String database;
String table; String table;
ASTOptimizeQuery() {} ASTOptimizeQuery() = default;
ASTOptimizeQuery(StringRange range_) : IAST(range_) {} ASTOptimizeQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "OptimizeQuery_" + database + "_" + table; }; String getID() const override { return "OptimizeQuery_" + database + "_" + table; };
ASTPtr clone() const { return new ASTOptimizeQuery(*this); } ASTPtr clone() const override { return new ASTOptimizeQuery(*this); }
}; };
} }

View File

@ -21,14 +21,14 @@ public:
*/ */
Poco::SharedPtr<Collator> collator; Poco::SharedPtr<Collator> collator;
ASTOrderByElement() {} ASTOrderByElement() = default;
ASTOrderByElement(StringRange range_, int direction_, const Poco::SharedPtr<Collator> & collator_ = nullptr) ASTOrderByElement(const StringRange range_, const int direction_, const Poco::SharedPtr<Collator> & collator_ = nullptr)
: IAST(range_), direction(direction_), collator(collator_) {} : IAST(range_), direction(direction_), collator(collator_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "OrderByElement"; } String getID() const override { return "OrderByElement"; }
ASTPtr clone() const { return new ASTOrderByElement(*this); } ASTPtr clone() const override { return new ASTOrderByElement(*this); }
}; };
} }

View File

@ -14,8 +14,8 @@ class ASTQueryWithOutput : public IAST
public: public:
ASTPtr format; ASTPtr format;
ASTQueryWithOutput() {} ASTQueryWithOutput() = default;
ASTQueryWithOutput(StringRange range_) : IAST(range_) {} ASTQueryWithOutput(const StringRange range_) : IAST(range_) {}
}; };
@ -26,18 +26,19 @@ class Name : public ASTQueryWithOutput \
public: \ public: \
Name() {} \ Name() {} \
Name(StringRange range_) : ASTQueryWithOutput(range_) {} \ Name(StringRange range_) : ASTQueryWithOutput(range_) {} \
String getID() const { return ID; }; \ String getID() const override { return ID; }; \
\ \
ASTPtr clone() const \ ASTPtr clone() const override \
{ \ { \
Name * res = new Name(*this); \ Name * res = new Name(*this); \
ASTPtr ptr{res}; \
res->children.clear(); \ res->children.clear(); \
if (format) \ if (format) \
{ \ { \
res->format = format->clone(); \ res->format = format->clone(); \
res->children.push_back(res->format); \ res->children.push_back(res->format); \
} \ } \
return res; \ return ptr; \
} \ } \
}; };

View File

@ -16,8 +16,8 @@ namespace DB
String database; String database;
String table; String table;
ASTQueryWithTableAndOutput() {} ASTQueryWithTableAndOutput() = default;
ASTQueryWithTableAndOutput(StringRange range_) : ASTQueryWithOutput(range_) {} ASTQueryWithTableAndOutput(const StringRange range_) : ASTQueryWithOutput(range_) {}
}; };
@ -26,20 +26,21 @@ namespace DB
class Name : public ASTQueryWithTableAndOutput \ class Name : public ASTQueryWithTableAndOutput \
{ \ { \
public: \ public: \
Name() {} \ Name() = default; \
Name(StringRange range_) : ASTQueryWithTableAndOutput(range_) {} \ Name(const StringRange range_) : ASTQueryWithTableAndOutput(range_) {} \
String getID() const { return ID"_" + database + "_" + table; }; \ String getID() const override { return ID"_" + database + "_" + table; }; \
\ \
ASTPtr clone() const \ ASTPtr clone() const override \
{ \ { \
Name * res = new Name(*this); \ Name * res = new Name(*this); \
ASTPtr ptr{res}; \
res->children.clear(); \ res->children.clear(); \
if (format) \ if (format) \
{ \ { \
res->format = format->clone(); \ res->format = format->clone(); \
res->children.push_back(res->format); \ res->children.push_back(res->format); \
} \ } \
return res; \ return ptr; \
} \ } \
}; };
} }

View File

@ -27,13 +27,13 @@ public:
typedef std::vector<Element> Elements; typedef std::vector<Element> Elements;
Elements elements; Elements elements;
ASTRenameQuery() {} ASTRenameQuery() = default;
ASTRenameQuery(StringRange range_) : IAST(range_) {} ASTRenameQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Rename"; }; String getID() const override { return "Rename"; };
ASTPtr clone() const { return new ASTRenameQuery(*this); } ASTPtr clone() const override { return new ASTRenameQuery(*this); }
}; };
} }

View File

@ -30,12 +30,13 @@ public:
ASTPtr order_expression_list; ASTPtr order_expression_list;
ASTPtr limit_offset; ASTPtr limit_offset;
ASTPtr limit_length; ASTPtr limit_length;
ASTPtr next_union_all; /// Следующий запрос SELECT в цепочке UNION ALL, если такой есть
ASTSelectQuery() {} ASTSelectQuery() = default;
ASTSelectQuery(StringRange range_) : ASTQueryWithOutput(range_) {} ASTSelectQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "SelectQuery"; }; String getID() const override { return "SelectQuery"; };
/// Проверить наличие функции arrayJoin. (Не большого ARRAY JOIN.) /// Проверить наличие функции arrayJoin. (Не большого ARRAY JOIN.)
static bool hasArrayJoin(const ASTPtr & ast) static bool hasArrayJoin(const ASTPtr & ast)
@ -102,9 +103,11 @@ public:
*/ */
} }
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTSelectQuery * res = new ASTSelectQuery(*this); ASTSelectQuery * res = new ASTSelectQuery(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
#define CLONE(member) if (member) { res->member = member->clone(); res->children.push_back(res->member); } #define CLONE(member) if (member) { res->member = member->clone(); res->children.push_back(res->member); }
@ -123,10 +126,11 @@ public:
CLONE(limit_offset) CLONE(limit_offset)
CLONE(limit_length) CLONE(limit_length)
CLONE(format) CLONE(format)
CLONE(next_union_all)
#undef CLONE #undef CLONE
return res; return ptr;
} }
}; };

View File

@ -18,10 +18,10 @@ public:
bool is_explicit = false; bool is_explicit = false;
ASTSet(const String & column_name_) : column_name(column_name_) {} ASTSet(const String & column_name_) : column_name(column_name_) {}
ASTSet(StringRange range_, const String & column_name_) : IAST(range_), column_name(column_name_) {} ASTSet(const StringRange range_, const String & column_name_) : IAST(range_), column_name(column_name_) {}
String getID() const { return "Set_" + getColumnName(); } String getID() const override { return "Set_" + getColumnName(); }
ASTPtr clone() const { return new ASTSet(*this); } ASTPtr clone() const override { return new ASTSet(*this); }
String getColumnName() const { return column_name; } String getColumnName() const override { return column_name; }
}; };
} }

View File

@ -24,13 +24,13 @@ public:
bool global; /// Если запрос SET GLOBAL. bool global; /// Если запрос SET GLOBAL.
ASTSetQuery() {} ASTSetQuery() = default;
ASTSetQuery(StringRange range_) : IAST(range_) {} ASTSetQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Set"; }; String getID() const override { return "Set"; };
ASTPtr clone() const { return new ASTSetQuery(*this); } ASTPtr clone() const override { return new ASTSetQuery(*this); }
}; };
} }

View File

@ -13,20 +13,22 @@ namespace DB
class ASTShowTablesQuery : public ASTQueryWithOutput class ASTShowTablesQuery : public ASTQueryWithOutput
{ {
public: public:
bool databases; bool databases{false};
String from; String from;
String like; String like;
bool not_like; bool not_like{false};
ASTShowTablesQuery() : databases(false), not_like(false) {} ASTShowTablesQuery() = default;
ASTShowTablesQuery(StringRange range_) : ASTQueryWithOutput(range_), databases(false), not_like(false) {} ASTShowTablesQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "ShowTables"; }; String getID() const override { return "ShowTables"; };
ASTPtr clone() const ASTPtr clone() const override
{ {
ASTShowTablesQuery * res = new ASTShowTablesQuery(*this); ASTShowTablesQuery * res = new ASTShowTablesQuery(*this);
ASTPtr ptr{res};
res->children.clear(); res->children.clear();
if (format) if (format)
@ -35,7 +37,7 @@ public:
res->children.push_back(res->format); res->children.push_back(res->format);
} }
return res; return ptr;
} }
}; };

View File

@ -9,25 +9,31 @@ namespace DB
{ {
/** Подзарос SELECT в секции IN. /** Подзарос SELECT
*/ */
class ASTSubquery : public IAST class ASTSubquery : public IAST
{ {
public: public:
/// тип возвращаемого значения ASTSubquery() = default;
DataTypePtr return_type; ASTSubquery(const StringRange range_) : IAST(range_) {}
/// номер столбца возвращаемого значения
size_t return_column_number;
ASTSubquery() {}
ASTSubquery(StringRange range_) : IAST(range_), return_column_number(0) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "Subquery"; }; String getID() const override { return "Subquery"; }
ASTPtr clone() const { return new ASTSubquery(*this); } ASTPtr clone() const override
{
const auto res = new ASTSubquery{*this};
ASTPtr ptr{res};
String getColumnName() const { return getTreeID(); } res->children.clear();
for (const auto & child : children)
res->children.emplace_back(child->clone());
return ptr;
}
String getColumnName() const override { return getTreeID(); }
}; };
} }

View File

@ -14,13 +14,13 @@ class ASTUseQuery : public IAST
public: public:
String database; String database;
ASTUseQuery() {} ASTUseQuery() = default;
ASTUseQuery(StringRange range_) : IAST(range_) {} ASTUseQuery(const StringRange range_) : IAST(range_) {}
/** Получить текст, который идентифицирует этот элемент. */ /** Получить текст, который идентифицирует этот элемент. */
String getID() const { return "UseQuery_" + database; }; String getID() const override { return "UseQuery_" + database; };
ASTPtr clone() const { return new ASTUseQuery(*this); } ASTPtr clone() const override { return new ASTUseQuery(*this); }
}; };
} }

View File

@ -23,8 +23,7 @@ public:
/// helper for setting aliases and chaining result to other functions /// helper for setting aliases and chaining result to other functions
inline ASTPtr setAlias(ASTPtr ast, const String & alias) { inline ASTPtr setAlias(ASTPtr ast, const String & alias) {
dynamic_cast<ASTWithAlias &>(*ast).alias = alias; ast->setAlias(alias);
return ast; return ast;
}; };

View File

@ -38,9 +38,9 @@ public:
*/ */
StringPtr query_string; StringPtr query_string;
IAST() {} IAST() = default;
IAST(StringRange range_) : range(range_) {} IAST(const StringRange range_) : range(range_) {}
virtual ~IAST() {} virtual ~IAST() = default;
/** Получить каноническое имя столбца, если элемент является столбцом */ /** Получить каноническое имя столбца, если элемент является столбцом */
virtual String getColumnName() const { throw Exception("Trying to get name of not a column: " + getID(), ErrorCodes::NOT_A_COLUMN); } virtual String getColumnName() const { throw Exception("Trying to get name of not a column: " + getID(), ErrorCodes::NOT_A_COLUMN); }

View File

@ -162,6 +162,7 @@ public:
virtual BlockInputStreams read( virtual BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -191,8 +191,11 @@ public:
void checkSizes(const String & path) const; void checkSizes(const String & path) const;
/// Сериализует и десериализует в человекочитаемом виде. /// Сериализует и десериализует в человекочитаемом виде.
bool readText(ReadBuffer & in); /// Возвращает false, если чексуммы в слишком старом формате. bool read(ReadBuffer & in); /// Возвращает false, если чексуммы в слишком старом формате.
void writeText(WriteBuffer & out) const; bool read_v2(ReadBuffer & in);
bool read_v3(ReadBuffer & in);
bool read_v4(ReadBuffer & in);
void write(WriteBuffer & out) const;
bool empty() const bool empty() const
{ {
@ -228,7 +231,7 @@ public:
String s; String s;
{ {
WriteBufferFromString out(s); WriteBufferFromString out(s);
writeText(out); write(out);
} }
return s; return s;
} }
@ -237,7 +240,7 @@ public:
{ {
ReadBufferFromString in(s); ReadBufferFromString in(s);
Checksums res; Checksums res;
if (!res.readText(in)) if (!res.read(in))
throw Exception("Checksums format is too old", ErrorCodes::FORMAT_VERSION_TOO_OLD); throw Exception("Checksums format is too old", ErrorCodes::FORMAT_VERSION_TOO_OLD);
assertEOF(in); assertEOF(in);
return res; return res;
@ -385,7 +388,7 @@ public:
return; return;
} }
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize())); ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
if (checksums.readText(file)) if (checksums.read(file))
assertEOF(file); assertEOF(file);
} }

View File

@ -2,6 +2,7 @@
#include <DB/Storages/MergeTree/MergeTreeData.h> #include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h> #include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <atomic>
namespace DB namespace DB
{ {
@ -13,7 +14,7 @@ class MergeTreeDataMerger
public: public:
static const size_t NO_LIMIT = std::numeric_limits<size_t>::max(); static const size_t NO_LIMIT = std::numeric_limits<size_t>::max();
MergeTreeDataMerger(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Merger)")), canceled(false) {} MergeTreeDataMerger(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (Merger)")) {}
typedef std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> AllowedMergingPredicate; typedef std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> AllowedMergingPredicate;
@ -49,9 +50,8 @@ public:
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение. /** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll(). * Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll().
*/ */
void cancelAll() { canceled = true; } bool cancelAll() { return canceled.exchange(true, std::memory_order_relaxed); }
bool uncancelAll() { return canceled.exchange(false, std::memory_order_relaxed); }
void uncancelAll() { canceled = false; }
private: private:
MergeTreeData & data; MergeTreeData & data;
@ -61,7 +61,24 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто). /// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0; time_t disk_space_warning_time = 0;
volatile bool canceled; std::atomic<bool> canceled{false};
};
class MergeTreeMergeBlocker
{
public:
MergeTreeMergeBlocker(MergeTreeDataMerger & merger)
: merger(merger), was_cancelled{merger.cancelAll()} {}
~MergeTreeMergeBlocker()
{
if (was_cancelled)
merger.uncancelAll();
}
private:
MergeTreeDataMerger & merger;
const bool was_cancelled;
}; };
} }

View File

@ -20,6 +20,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -38,10 +38,11 @@ class MergeTreeReader
typedef std::map<std::string, ColumnPtr> OffsetColumns; typedef std::map<std::string, ColumnPtr> OffsetColumns;
public: public:
MergeTreeReader(const String & path_, const MergeTreeData::DataPartPtr & data_part, /// Путь к куску MergeTreeReader(const String & path_, /// Путь к куску
const NamesAndTypesList & columns_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges) const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns_,
: path(path_), data_part(data_part), part_name(data_part->name), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_), bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges)
all_mark_ranges(all_mark_ranges) : path(path_), data_part(data_part), part_name(data_part->name), columns(columns_),
use_uncompressed_cache(use_uncompressed_cache_), storage(storage_), all_mark_ranges(all_mark_ranges)
{ {
try try
{ {

View File

@ -294,7 +294,7 @@ public:
{ {
/// Записываем файл с чексуммами. /// Записываем файл с чексуммами.
WriteBufferFromFile out(part_path + "checksums.txt", 4096); WriteBufferFromFile out(part_path + "checksums.txt", 4096);
checksums.writeText(out); checksums.write(out);
} }
return checksums; return checksums;

View File

@ -22,12 +22,14 @@ public:
void write(const Block & block) override void write(const Block & block) override
{ {
assertSessionIsNotExpired(); auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
auto part_blocks = storage.writer.splitBlockIntoParts(block); auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks) for (auto & current_block : part_blocks)
{ {
assertSessionIsNotExpired(); assertSessionIsNotExpired(zookeeper);
++block_index; ++block_index;
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
@ -60,29 +62,29 @@ public:
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id, storage.zookeeper_path + "/blocks/" + block_id,
"", "",
storage.zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/columns", storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(), part->columns.toString(),
storage.zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(), part->checksums.toString(),
storage.zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number", storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number), toString(part_number),
storage.zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent)); zkutil::CreateMode::Persistent));
} }
storage.checkPartAndAddToZooKeeper(part, ops, part_name); storage.checkPartAndAddToZooKeeper(part, ops, part_name);
ops.push_back(new zkutil::Op::Create( ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/log/log-", storage.zookeeper_path + "/log/log-",
log_entry.toString(), log_entry.toString(),
storage.zookeeper->getDefaultACL(), zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential)); zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops); block_number_lock.getUnlockOps(ops);
@ -91,7 +93,7 @@ public:
try try
{ {
auto code = storage.zookeeper->tryMulti(ops); auto code = zookeeper->tryMulti(ops);
if (code == ZOK) if (code == ZOK)
{ {
transaction.commit(); transaction.commit();
@ -101,7 +103,7 @@ public:
{ {
/// Если блок с таким ID уже есть в таблице, откатим его вставку. /// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str; String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet( if (!block_id.empty() && zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str)) storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{ {
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")"); LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
@ -149,9 +151,9 @@ private:
/// Позволяет проверить, что сессия в ZooKeeper ещё жива. /// Позволяет проверить, что сессия в ZooKeeper ещё жива.
void assertSessionIsNotExpired() void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
{ {
if (storage.zookeeper->expired()) if (zookeeper->expired())
throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER); throw Exception("ZooKeeper session has been expired.", ErrorCodes::NO_ZOOKEEPER);
} }
}; };

View File

@ -61,6 +61,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -107,6 +108,7 @@ private:
Logger * log; Logger * log;
Poco::Event shutdown_event;
/// Выполняет сброс данных по таймауту. /// Выполняет сброс данных по таймауту.
std::thread flush_thread; std::thread flush_thread;
@ -122,7 +124,6 @@ private:
/// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у. /// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у.
void writeBlockToDestination(const Block & block, StoragePtr table); void writeBlockToDestination(const Block & block, StoragePtr table);
Poco::Event shutdown_event;
void flushThread(); void flushThread();
}; };

View File

@ -40,6 +40,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -25,6 +25,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -37,6 +37,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -46,6 +47,7 @@ public:
const std::string & chunk_name, const std::string & chunk_name,
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -60,6 +60,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -162,6 +162,7 @@ public:
virtual BlockInputStreams read( virtual BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -227,6 +228,7 @@ protected:
size_t to_mark, size_t to_mark,
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -37,6 +37,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -83,6 +83,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -49,6 +49,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -70,6 +70,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -33,10 +33,11 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, const size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override const unsigned threads = 1) override
{ {
return { new NullBlockInputStream }; return { new NullBlockInputStream };
} }

View File

@ -74,6 +74,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -146,10 +147,24 @@ private:
typedef std::list<String> StringList; typedef std::list<String> StringList;
Context & context; Context & context;
zkutil::ZooKeeperPtr zookeeper;
zkutil::ZooKeeperPtr current_zookeeper; /// Используйте только с помощью методов ниже.
std::mutex current_zookeeper_mutex; /// Для пересоздания сессии в фоновом потоке.
zkutil::ZooKeeperPtr getZooKeeper()
{
std::lock_guard<std::mutex> lock(current_zookeeper_mutex);
return current_zookeeper;
}
void setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard<std::mutex> lock(current_zookeeper_mutex);
current_zookeeper = zookeeper;
}
/// Если true, таблица в офлайновом режиме, и в нее нельзя писать. /// Если true, таблица в офлайновом режиме, и в нее нельзя писать.
bool is_read_only = false; bool is_readonly = false;
/// Каким будет множество активных кусков после выполнения всей текущей очереди. /// Каким будет множество активных кусков после выполнения всей текущей очереди.
ActiveDataPartSet virtual_parts; ActiveDataPartSet virtual_parts;
@ -210,6 +225,10 @@ private:
std::unique_ptr<MergeTreeDataMerger> unreplicated_merger; std::unique_ptr<MergeTreeDataMerger> unreplicated_merger;
std::mutex unreplicated_mutex; /// Для мерджей и удаления нереплицируемых кусков. std::mutex unreplicated_mutex; /// Для мерджей и удаления нереплицируемых кусков.
/// Нужно ли завершить фоновые потоки (кроме restarting_thread).
volatile bool shutdown_called = false;
Poco::Event shutdown_event;
/// Потоки: /// Потоки:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь. /// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
@ -242,10 +261,6 @@ private:
Logger * log; Logger * log;
/// Нужно ли завершить фоновые потоки (кроме restarting_thread).
volatile bool shutdown_called = false;
Poco::Event shutdown_event;
StorageReplicatedMergeTree( StorageReplicatedMergeTree(
const String & zookeeper_path_, const String & zookeeper_path_,
const String & replica_name_, const String & replica_name_,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -26,6 +26,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -17,6 +17,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -28,6 +28,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -25,6 +25,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -129,6 +129,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -33,6 +33,7 @@ public:
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE, size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -90,7 +90,12 @@ private:
NamesAndTypesList res; NamesAndTypesList res;
/// Отправляем на первый попавшийся шард /// Отправляем на первый попавшийся шард
BlockInputStreamPtr input = new RemoteBlockInputStream(&*cluster.pools.front(), query, &settings, Tables(), QueryProcessingStage::Complete); BlockInputStreamPtr input{
new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings,
Tables(), QueryProcessingStage::Complete, context
}
};
input->readPrefix(); input->readPrefix();
while (true) while (true)

View File

@ -96,6 +96,7 @@ private:
String format; /// Формат вывода результата в консоль. String format; /// Формат вывода результата в консоль.
size_t format_max_block_size = 0; /// Максимальный размер блока при выводе в консоль. size_t format_max_block_size = 0; /// Максимальный размер блока при выводе в консоль.
String current_format; /// Формат вывода результата текущего запроса в консоль.
String insert_format; /// Формат данных для INSERT-а при чтении их из stdin в batch режиме String insert_format; /// Формат данных для INSERT-а при чтении их из stdin в batch режиме
size_t insert_format_max_block_size = 0; /// Максимальный размер блока при чтении данных INSERT-а. size_t insert_format_max_block_size = 0; /// Максимальный размер блока при чтении данных INSERT-а.
@ -232,8 +233,15 @@ private:
<< "." << Revision::get() << "." << Revision::get()
<< "." << std::endl; << "." << std::endl;
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated"); if (is_interactive)
{
format = config().getString("format", config().has("vertical") ? "Vertical" : "PrettyCompact");
}
else
format = config().getString("format", "TabSeparated");
format_max_block_size = config().getInt("format_max_block_size", DEFAULT_BLOCK_SIZE); format_max_block_size = config().getInt("format_max_block_size", DEFAULT_BLOCK_SIZE);
current_format = format;
insert_format = "Values"; insert_format = "Values";
insert_format_max_block_size = config().getInt("insert_format_max_block_size", DEFAULT_INSERT_BLOCK_SIZE); insert_format_max_block_size = config().getInt("insert_format_max_block_size", DEFAULT_INSERT_BLOCK_SIZE);
@ -350,16 +358,21 @@ private:
bool ends_with_semicolon = line[ws - 1] == ';'; bool ends_with_semicolon = line[ws - 1] == ';';
bool ends_with_backslash = line[ws - 1] == '\\'; bool ends_with_backslash = line[ws - 1] == '\\';
bool ends_with_format_vertical = (ws >= 2) && (line[ws - 2] == '\\') && (line[ws - 1] == 'G');
if (ends_with_backslash) if (ends_with_backslash)
line = line.substr(0, ws - 1); line = line.substr(0, ws - 1);
query += line; query += line;
if (!ends_with_backslash && (ends_with_semicolon || !config().has("multiline"))) if (!ends_with_backslash && (ends_with_semicolon || ends_with_format_vertical || !config().has("multiline")))
{ {
if (query != prev_query) if (query != prev_query)
{ {
// Заменяем переводы строк на пробелы, а то возникает следуцющая проблема.
// Каждая строчка многострочного запроса сохраняется в истории отдельно. Если
// выйти из клиента и войти заново, то при нажатии клавиши "вверх" выводится не
// весь многострочный запрос, а каждая его строчка по-отдельности.
std::string logged_query = query; std::string logged_query = query;
std::replace(logged_query.begin(), logged_query.end(), '\n', ' '); std::replace(logged_query.begin(), logged_query.end(), '\n', ' ');
add_history(logged_query.c_str()); add_history(logged_query.c_str());
@ -369,6 +382,12 @@ private:
prev_query = query; prev_query = query;
} }
if (ends_with_format_vertical)
{
current_format = config().getString("format", "Vertical");
query = query.substr(0, query.length() - 2);
}
try try
{ {
@ -392,6 +411,7 @@ private:
} }
query = ""; query = "";
current_format = format;
} }
else else
{ {
@ -572,11 +592,15 @@ private:
connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete, &context.getSettingsRef(), true); connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete, &context.getSettingsRef(), true);
sendExternalTables(); sendExternalTables();
/// Получим структуру таблицы /// Получаем структуру таблицы.
Block sample = receiveSampleBlock(); Block sample;
if (receiveSampleBlock(sample))
sendData(sample); {
receivePacket(); /// Если была получена структура, т.е. сервер не выкинул исключения,
/// отправляем эту структуру вместе с данными.
sendData(sample);
receivePacket();
}
} }
@ -758,15 +782,21 @@ private:
/** Получить блок - пример структуры таблицы, в которую будут вставляться данные. /** Получить блок - пример структуры таблицы, в которую будут вставляться данные.
*/ */
Block receiveSampleBlock() bool receiveSampleBlock(Block & out)
{ {
Connection::Packet packet = connection->receivePacket(); Connection::Packet packet = connection->receivePacket();
switch (packet.type) switch (packet.type)
{ {
case Protocol::Server::Data: case Protocol::Server::Data:
return packet.block; out = packet.block;
return true;
case Protocol::Server::Exception:
onException(*packet.exception);
last_exception = packet.exception;
return false;
default: default:
throw Exception("Unexpected packet from server (expected Data, got " throw Exception("Unexpected packet from server (expected Data, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
@ -785,8 +815,6 @@ private:
processed_rows += block.rows(); processed_rows += block.rows();
if (!block_std_out) if (!block_std_out)
{ {
String current_format = format;
/// Формат может быть указан в запросе. /// Формат может быть указан в запросе.
if (ASTQueryWithOutput * query_with_output = dynamic_cast<ASTQueryWithOutput *>(&*parsed_query)) if (ASTQueryWithOutput * query_with_output = dynamic_cast<ASTQueryWithOutput *>(&*parsed_query))
if (query_with_output->format) if (query_with_output->format)
@ -973,6 +1001,7 @@ public:
("database,d", boost::program_options::value<std::string>(), "database") ("database,d", boost::program_options::value<std::string>(), "database")
("multiline,m", "multiline") ("multiline,m", "multiline")
("multiquery,n", "multiquery") ("multiquery,n", "multiquery")
("vertical,E", "vertical")
APPLY_FOR_SETTINGS(DECLARE_SETTING) APPLY_FOR_SETTINGS(DECLARE_SETTING)
APPLY_FOR_LIMITS(DECLARE_LIMIT) APPLY_FOR_LIMITS(DECLARE_LIMIT)
; ;
@ -1085,6 +1114,8 @@ public:
config().setBool("multiline", true); config().setBool("multiline", true);
if (options.count("multiquery")) if (options.count("multiquery"))
config().setBool("multiquery", true); config().setBool("multiquery", true);
if (options.count("vertical"))
config().setBool("vertical", true);
} }
}; };

View File

@ -19,7 +19,7 @@ static std::string formatReadable(double size, int precision, const char ** unit
std::string formatReadableSizeWithBinarySuffix(double value, int precision) std::string formatReadableSizeWithBinarySuffix(double value, int precision)
{ {
const char * units[] = {" B", " KiB", " MiB", " GiB", " TiB", " PiB", " EiB", " ZiB", " YiB"}; const char * units[] = {" B", " KiB", " MiB", " GiB", " TiB", " PiB", " EiB", " ZiB", " YiB"};
return formatReadable(value, precision, units, sizeof(units) / sizeof(units[0]), 1024); return formatReadable(value, precision, units, sizeof(units) / sizeof(units[0]), 1024);
} }

View File

@ -363,7 +363,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{ {
const IDataType & lhs_type = *lhs.getByPosition(i).type; const IDataType & lhs_type = *lhs.getByPosition(i).type;
const IDataType & rhs_type = *rhs.getByPosition(i).type; const IDataType & rhs_type = *rhs.getByPosition(i).type;
if (lhs_type.getName() != rhs_type.getName()) if (lhs_type.getName() != rhs_type.getName())
return false; return false;
} }

View File

@ -73,11 +73,19 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
watch.stop(); watch.stop();
size_t rows_before_merge = 0;
size_t bytes_before_merge = 0;
for (const auto & block : blocks)
{
rows_before_merge += block.rowsInFirstColumn();
bytes_before_merge += block.bytes();
}
LOG_DEBUG(log, std::fixed << std::setprecision(2) LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << blocks.size() << " blocks, " << merged.rows() << " rows" << "Merge sorted " << blocks.size() << " blocks, from " << rows_before_merge << " to " << merged.rows() << " rows"
<< " in " << watch.elapsedSeconds() << " sec., " << " in " << watch.elapsedSeconds() << " sec., "
<< merged.rows() / watch.elapsedSeconds() << " rows/sec., " << rows_before_merge / watch.elapsedSeconds() << " rows/sec., "
<< merged.bytes() / 1000000.0 / watch.elapsedSeconds() << " MiB/sec."); << bytes_before_merge / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
return merged; return merged;
} }

View File

@ -65,7 +65,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in; Poco::SharedPtr<IBlockInputStream> in;
in = table->read(column_names, 0, Settings(), stage)[0]; in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression); in = new ExpressionBlockInputStream(in, expression);
in = new LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10)); in = new LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));

View File

@ -70,7 +70,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage)[0]; Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression); in = new ExpressionBlockInputStream(in, expression);
in = new FilterBlockInputStream(in, 1); in = new FilterBlockInputStream(in, 1);
in = new LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10)); in = new LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));

View File

@ -142,7 +142,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage)[0]; Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression); in = new ExpressionBlockInputStream(in, expression);
in = new FilterBlockInputStream(in, 4); in = new FilterBlockInputStream(in, 4);
//in = new LimitBlockInputStream(in, 10); //in = new LimitBlockInputStream(in, 10);

View File

@ -85,7 +85,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
BlockInputStreamPtr in = table->read(column_names, 0, Settings(), stage)[0]; BlockInputStreamPtr in = table->read(column_names, 0, context, Settings(), stage)[0];
ForkBlockInputStreams fork(in); ForkBlockInputStreams fork(in);

View File

@ -21,6 +21,8 @@
#include <DB/Storages/StorageLog.h> #include <DB/Storages/StorageLog.h>
#include <DB/Interpreters/Context.h>
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
@ -102,7 +104,7 @@ int main(int argc, char ** argv)
if (argc == 2 && 0 == strcmp(argv[1], "read")) if (argc == 2 && 0 == strcmp(argv[1], "read"))
{ {
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage)[0]; SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage)[0];
WriteBufferFromOStream out1(std::cout); WriteBufferFromOStream out1(std::cout);
CompressedWriteBuffer out2(out1); CompressedWriteBuffer out2(out1);
NativeBlockOutputStream out3(out2); NativeBlockOutputStream out3(out2);

View File

@ -159,7 +159,7 @@ int main(int argc, char ** argv)
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0]; Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
in = new PartialSortingBlockInputStream(in, sort_columns); in = new PartialSortingBlockInputStream(in, sort_columns);
in = new MergeSortingBlockInputStream(in, sort_columns); in = new MergeSortingBlockInputStream(in, sort_columns);
//in = new LimitBlockInputStream(in, 10); //in = new LimitBlockInputStream(in, 10);

View File

@ -15,6 +15,8 @@
#include <DB/DataTypes/DataTypesNumberFixed.h> #include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Interpreters/Context.h>
using Poco::SharedPtr; using Poco::SharedPtr;
@ -32,9 +34,9 @@ int main(int argc, char ** argv)
DB::QueryProcessingStage::Enum stage3; DB::QueryProcessingStage::Enum stage3;
DB::BlockInputStreams streams; DB::BlockInputStreams streams;
streams.push_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Settings(), stage1, 1)[0], 30, 30000)); streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage1, 1)[0], 30, 30000));
streams.push_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Settings(), stage2, 1)[0], 30, 2000)); streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage2, 1)[0], 30, 2000));
streams.push_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Settings(), stage3, 1)[0], 30, 100)); streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage3, 1)[0], 30, 100));
DB::UnionBlockInputStream union_stream(streams, 2); DB::UnionBlockInputStream union_stream(streams, 2);

View File

@ -39,7 +39,7 @@ int main(int argc, char ** argv)
DB::StoragePtr table = context.getTable("default", "hits6"); DB::StoragePtr table = context.getTable("default", "hits6");
DB::QueryProcessingStage::Enum stage; DB::QueryProcessingStage::Enum stage;
DB::BlockInputStreams streams = table->read(column_names, nullptr, settings, stage, settings.max_block_size, settings.max_threads); DB::BlockInputStreams streams = table->read(column_names, nullptr, context, settings, stage, settings.max_block_size, settings.max_threads);
for (size_t i = 0, size = streams.size(); i < size; ++i) for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = new DB::AsynchronousBlockInputStream(streams[i]); streams[i] = new DB::AsynchronousBlockInputStream(streams[i]);

View File

@ -33,17 +33,18 @@ static void numWidthConstant(T a, UInt64 & c)
c = 2 + log10(-a); c = 2 + log10(-a);
} }
inline UInt64 floatWidth(double x) inline UInt64 floatWidth(const double x)
{ {
/// Не быстро. /// Не быстро.
unsigned size = WRITE_HELPERS_DEFAULT_FLOAT_PRECISION + 10; char tmp[25];
char tmp[size]; /// знаки, +0.0e+123\0 double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
int res = std::snprintf(tmp, size, "%.*g", WRITE_HELPERS_DEFAULT_FLOAT_PRECISION, x);
if (res >= static_cast<int>(size) || res <= 0) const auto result = getDoubleToStringConverter<false>().ToShortest(x, &builder);
if (!result)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER); throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
return res; return builder.position();
} }
template <typename T> template <typename T>

View File

@ -32,6 +32,10 @@
#include <DB/Parsers/formatAST.h> #include <DB/Parsers/formatAST.h>
#include <DB/Functions/FunctionFactory.h>
#include <statdaemons/ext/range.hpp>
namespace DB namespace DB
{ {
@ -589,6 +593,9 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
if (!parse_res) if (!parse_res)
throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".", throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
/// @note it may be more appropriate to manually replace ASTAsterisk with table's columns
ExpressionAnalyzer{query, context, subquery_depth};
} }
else else
query = subquery->children.at(0); query = subquery->children.at(0);
@ -1055,7 +1062,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
} }
} }
FunctionPtr function = context.getFunctionFactory().get(node->name, context); const FunctionPtr & function = FunctionFactory::instance().get(node->name, context);
Names argument_names; Names argument_names;
DataTypes argument_types; DataTypes argument_types;
@ -1737,26 +1744,32 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
return; return;
auto & node = typeid_cast<ASTJoin &>(*select_query->join); auto & node = typeid_cast<ASTJoin &>(*select_query->join);
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов.
size_t num_join_keys = keys.children.size(); Block nested_result_sample;
if (const auto identifier = typeid_cast<const ASTIdentifier *>(node.table.get()))
for (size_t i = 0; i < num_join_keys; ++i)
{ {
if (!join_key_names_left_set.insert(keys.children[i]->getColumnName()).second) const auto & table = context.getTable("", identifier->name);
nested_result_sample = table->getSampleBlockNonMaterialized();
}
else if (typeid_cast<const ASTSubquery *>(node.table.get()))
{
const auto & table = node.table->children.at(0);
nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
}
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
for (const auto & key : keys.children)
{
if (!join_key_names_left_set.insert(key->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN); throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (!join_key_names_right_set.insert(keys.children[i]->getAliasOrColumnName()).second) if (!join_key_names_right_set.insert(key->getAliasOrColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN); throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
} }
Block nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock(); for (const auto i : ext::range(0, nested_result_sample.columns()))
size_t nested_result_columns = nested_result_sample.columns();
for (size_t i = 0; i < nested_result_columns; ++i)
{ {
auto col = nested_result_sample.getByPosition(i); const auto & col = nested_result_sample.getByPosition(i);
if (!join_key_names_right_set.count(col.name)) if (!join_key_names_right_set.count(col.name))
{ {
joined_columns.insert(col.name); joined_columns.insert(col.name);

File diff suppressed because it is too large Load Diff

View File

@ -101,7 +101,7 @@ struct FastHash64
uint64_t v; uint64_t v;
while (pos != end) { while (pos != end) {
v = *pos++; v = *pos++;
h ^= mix(v); h ^= mix(v);
h *= m; h *= m;
} }

Some files were not shown because too many files have changed in this diff Show More