dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-10-20 02:10:47 +00:00
parent 6b787234ad
commit 1a7af177cd
28 changed files with 321 additions and 192 deletions

View File

@ -26,6 +26,11 @@ public:
children.push_back(input);
}
String getName() const { return "AddingDefaultBlockInputStream"; }
BlockInputStreamPtr clone() { return new AddingDefaultBlockInputStream(input, required_columns); }
protected:
Block readImpl()
{
Block res = input->read();
@ -43,15 +48,11 @@ public:
res.rows(), it->second->getDefault())).convertToFullColumn();
res.insert(col);
}
}
}
return res;
}
String getName() const { return "AddingDefaultBlockInputStream"; }
BlockInputStreamPtr clone() { return new AddingDefaultBlockInputStream(input, required_columns); }
private:
BlockInputStreamPtr input;
NamesAndTypesListPtr required_columns;

View File

@ -31,12 +31,13 @@ public:
*/
AggregatingBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression);
Block readImpl();
String getName() const { return "AggregatingBlockInputStream"; }
BlockInputStreamPtr clone() { return new AggregatingBlockInputStream(*this); }
protected:
Block readImpl();
private:
AggregatingBlockInputStream(const AggregatingBlockInputStream & src)
: input(src.input), aggregator(src.aggregator), has_been_read(src.has_been_read) {}

View File

@ -48,6 +48,11 @@ public:
children.push_back(input);
}
String getName() const { return "ArrayJoiningBlockInputStream"; }
BlockInputStreamPtr clone() { return new ArrayJoiningBlockInputStream(input, array_column); }
protected:
Block readImpl()
{
Block block = input->read();
@ -67,7 +72,7 @@ public:
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & current = block.getByPosition(i);
if (static_cast<ssize_t>(i) == array_column)
{
ColumnWithNameAndType result;
@ -85,10 +90,6 @@ public:
return block;
}
String getName() const { return "ArrayJoiningBlockInputStream"; }
BlockInputStreamPtr clone() { return new ArrayJoiningBlockInputStream(input, array_column); }
private:
BlockInputStreamPtr input;
ssize_t array_column;

View File

@ -1,6 +1,8 @@
#pragma once
#include <statdaemons/threadpool.hpp>
#include <Poco/Mutex.h>
#include <Poco/Thread.h>
#include <Poco/Runnable.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
@ -8,83 +10,185 @@
namespace DB
{
/** Выполняет другой BlockInputStream в отдельном потоке, используя двойную буферизацию.
/** thread-safe очередь из одного элемента,
* рассчитанная на одного producer-а и одного consumer-а.
*/
template <typename T>
class OneElementQueue
{
private:
T data;
Poco::FastMutex mutex_fill; /// Захвачен, когда данные есть.
Poco::FastMutex mutex_empty; /// Захвачен, когда данных нет.
public:
OneElementQueue()
{
mutex_empty.lock();
}
/// Вызывается единственным producer-ом.
void push(const T & x)
{
mutex_fill.lock();
data = x;
mutex_empty.unlock();
}
/// Вызывается единственным consumer-ом.
void pop(T & x)
{
mutex_empty.lock();
x = data;
mutex_fill.unlock();
}
/// Позволяет ждать элемента не дольше заданного таймаута. Вызывается единственным consumer-ом.
bool poll(UInt64 milliseconds)
{
if (mutex_empty.tryLock(milliseconds))
{
mutex_empty.unlock();
return true;
}
return false;
}
};
/** Выполняет другой BlockInputStream в отдельном потоке.
* Это служит для двух целей:
* 1. Позволяет сделать так, чтобы разные стадии конвеьера выполнения запроса работали параллельно.
* 2. Позволяет не ждать до того, как данные будут готовы, а периодически проверять их готовность без блокировки.
* Это нужно, например, чтобы можно было во время ожидания проверить, не пришёл ли по сети пакет с просьбой прервать выполнение запроса.
* Также это позволяет выполнить несколько запросов одновременно.
*/
class AsynchronousBlockInputStream : public IProfilingBlockInputStream
{
public:
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), pool(1), started(false)
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : in(in_), started(false), runnable(*this)
{
children.push_back(in);
}
Block readImpl()
/** Ждать готовность данных не более заданного таймаута. Запустить получение данных, если нужно.
* Если функция вернула true - данные готовы и можно делать read().
*/
bool poll(UInt64 milliseconds)
{
/// Если вычислений ещё не было - вычислим первый блок синхронно
if (!started)
{
calculate();
started = true;
}
else /// Если вычисления уже идут - подождём результата
pool.wait();
if (exception)
exception->rethrow();
Block res = block;
if (!res)
return res;
/// Запустим вычисления следующего блока
block = Block();
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this));
return res;
startIfNeed();
return output_queue.poll(milliseconds);
}
String getName() const { return "AsynchronousBlockInputStream"; }
BlockInputStreamPtr clone() { return new AsynchronousBlockInputStream(in); }
~AsynchronousBlockInputStream()
{
if (started)
pool.wait();
thread->join();
}
protected:
BlockInputStreamPtr in;
boost::threadpool::pool pool;
bool started;
Block block;
ExceptionPtr exception;
/// Вычисления, которые могут выполняться в отдельном потоке
void calculate()
Block readImpl()
{
try
OutputData res;
startIfNeed();
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
output_queue.pop(res);
if (res.exception)
res.exception->rethrow();
return res.block;
}
void startIfNeed()
{
if (!started)
{
block = in->read();
}
catch (const Exception & e)
{
exception = e.clone();
}
catch (const Poco::Exception & e)
{
exception = e.clone();
}
catch (const std::exception & e)
{
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
thread = new Poco::Thread;
thread->start(runnable);
}
}
/// Вычисления, которые могут выполняться в отдельном потоке
class Thread : public Poco::Runnable
{
public:
Thread(AsynchronousBlockInputStream & parent_) : parent(parent_) {}
void run()
{
ExceptionPtr exception;
try
{
loop();
}
catch (const Exception & e)
{
exception = e.clone();
}
catch (const Poco::Exception & e)
{
exception = e.clone();
}
catch (const std::exception & e)
{
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
if (exception)
{
parent.cancel();
/// Отдаём эксепшен в основной поток.
parent.output_queue.push(exception);
}
}
void loop()
{
while (Block res = parent.in->read())
parent.output_queue.push(res);
}
private:
AsynchronousBlockInputStream & parent;
};
BlockInputStreamPtr in;
bool started;
struct OutputData
{
Block block;
ExceptionPtr exception;
OutputData() {}
OutputData(Block & block_) : block(block_) {}
OutputData(ExceptionPtr & exception_) : exception(exception_) {}
};
OneElementQueue<OutputData> output_queue;
Thread runnable;
SharedPtr<Poco::Thread> thread;
};
}

View File

@ -20,7 +20,6 @@ public:
const Block & sample_,
size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
Block readImpl();
void readPrefix() { row_input->readPrefix(); }
void readSuffix() { row_input->readSuffix(); }
@ -28,6 +27,9 @@ public:
BlockInputStreamPtr clone() { return new BlockInputStreamFromRowInputStream(row_input, sample, max_block_size); }
protected:
Block readImpl();
private:
RowInputStreamPtr row_input;
const Block sample;

View File

@ -32,13 +32,14 @@ public:
{
}
/// Может возвращаться на 1 больше записей, чем max_block_size.
Block readImpl();
String getName() const { return "CollapsingSortedBlockInputStream"; }
BlockInputStreamPtr clone() { return new CollapsingSortedBlockInputStream(inputs, description, sign_column, max_block_size); }
protected:
/// Может возвращаться на 1 больше записей, чем max_block_size.
Block readImpl();
private:
String sign_column;
size_t sign_column_number;

View File

@ -22,6 +22,11 @@ public:
current_stream = children.begin();
}
String getName() const { return "ConcatBlockInputStream"; }
BlockInputStreamPtr clone() { return new ConcatBlockInputStream(children); }
protected:
Block readImpl()
{
Block res;
@ -39,10 +44,6 @@ public:
return res;
}
String getName() const { return "ConcatBlockInputStream"; }
BlockInputStreamPtr clone() { return new ConcatBlockInputStream(children); }
private:
BlockInputStreams::iterator current_stream;
};

View File

@ -28,6 +28,11 @@ public:
children.push_back(input);
}
String getName() const { return "ExpressionBlockInputStream"; }
BlockInputStreamPtr clone() { return new ExpressionBlockInputStream(input, expression, part_id); }
protected:
Block readImpl()
{
Block res = input->read();
@ -38,10 +43,6 @@ public:
return res;
}
String getName() const { return "ExpressionBlockInputStream"; }
BlockInputStreamPtr clone() { return new ExpressionBlockInputStream(input, expression, part_id); }
private:
BlockInputStreamPtr input;
ExpressionPtr expression;

View File

@ -21,12 +21,14 @@ public:
/// filter_column_ - номер столбца с условиями фильтрации.
FilterBlockInputStream(BlockInputStreamPtr input_, ssize_t filter_column_);
FilterBlockInputStream(BlockInputStreamPtr input_, const String & filter_column_name_);
Block readImpl();
String getName() const { return "FilterBlockInputStream"; }
BlockInputStreamPtr clone() { return new FilterBlockInputStream(input, filter_column); }
protected:
Block readImpl();
private:
BlockInputStreamPtr input;
ssize_t filter_column;

View File

@ -22,6 +22,11 @@ public:
children.push_back(input);
}
String getName() const { return "FinalizingAggregatedBlockInputStream"; }
BlockInputStreamPtr clone() { return new FinalizingAggregatedBlockInputStream(input); }
protected:
Block readImpl()
{
Block res = input->read();
@ -42,7 +47,7 @@ public:
for (size_t j = 0; j < rows; ++j)
finalized_column->insert(data[j]->getResult());
column.column = finalized_column;
}
}
@ -50,10 +55,6 @@ public:
return res;
}
String getName() const { return "FinalizingAggregatedBlockInputStream"; }
BlockInputStreamPtr clone() { return new FinalizingAggregatedBlockInputStream(input); }
private:
BlockInputStreamPtr input;
};

View File

@ -49,9 +49,6 @@ public:
Block read();
/// Наследники должны реализовать эту функцию.
virtual Block readImpl() = 0;
/// Получить информацию о скорости выполнения.
const BlockStreamProfileInfo & getInfo() const;
@ -96,6 +93,9 @@ protected:
volatile bool is_cancelled;
IsCancelledCallback is_cancelled_callback;
ProgressCallback progress_callback;
/// Наследники должны реализовать эту функцию.
virtual Block readImpl() = 0;
};
}

View File

@ -17,12 +17,14 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
{
public:
LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_ = 0);
Block readImpl();
String getName() const { return "LimitBlockInputStream"; }
BlockInputStreamPtr clone() { return new LimitBlockInputStream(input, limit, offset); }
protected:
Block readImpl();
private:
BlockInputStreamPtr input;
size_t limit;

View File

@ -19,6 +19,11 @@ public:
children.push_back(input);
}
String getName() const { return "MaterializingBlockInputStream"; }
BlockInputStreamPtr clone() { return new MaterializingBlockInputStream(input); }
protected:
Block readImpl()
{
Block res = input->read();
@ -37,10 +42,6 @@ public:
return res;
}
String getName() const { return "MaterializingBlockInputStream"; }
BlockInputStreamPtr clone() { return new MaterializingBlockInputStream(input); }
private:
BlockInputStreamPtr input;
};

View File

@ -21,12 +21,13 @@ public:
children.push_back(input);
}
Block readImpl();
String getName() const { return "MergeSortingBlockInputStream"; }
BlockInputStreamPtr clone() { return new MergeSortingBlockInputStream(input, description); }
protected:
Block readImpl();
private:
BlockInputStreamPtr input;
SortDescription description;

View File

@ -29,12 +29,13 @@ public:
*/
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression);
Block readImpl();
String getName() const { return "MergingAggregatedBlockInputStream"; }
BlockInputStreamPtr clone() { return new MergingAggregatedBlockInputStream(*this); }
protected:
Block readImpl();
private:
MergingAggregatedBlockInputStream(const MergingAggregatedBlockInputStream & src)
: input(src.input), aggregator(src.aggregator), has_been_read(src.has_been_read) {}

View File

@ -24,7 +24,6 @@ public:
children.insert(children.end(), inputs.begin(), inputs.end());
}
Block readImpl();
void readSuffix();
String getName() const { return "MergingSortedBlockInputStream"; }
@ -32,6 +31,8 @@ public:
BlockInputStreamPtr clone() { return new MergingSortedBlockInputStream(inputs, description, max_block_size); }
protected:
Block readImpl();
/// Инициализирует очередь и следующий блок результата.
void init(Block & merged_block, ColumnPlainPtrs & merged_columns);

View File

@ -16,15 +16,13 @@ public:
NativeBlockInputStream(ReadBuffer & istr_, const DataTypeFactory & data_type_factory_)
: istr(istr_), data_type_factory(data_type_factory_) {}
/** Прочитать следующий блок.
* Если блоков больше нет - вернуть пустой блок (для которого operator bool возвращает false).
*/
Block readImpl();
String getName() const { return "NativeBlockInputStream"; }
BlockInputStreamPtr clone() { return new NativeBlockInputStream(istr, data_type_factory); }
protected:
Block readImpl();
private:
ReadBuffer & istr;
const DataTypeFactory & data_type_factory;

View File

@ -18,6 +18,11 @@ class OneBlockInputStream : public IProfilingBlockInputStream
public:
OneBlockInputStream(Block & block_) : block(block_), has_been_read(false) {}
String getName() const { return "OneBlockInputStream"; }
BlockInputStreamPtr clone() { return new OneBlockInputStream(block); }
protected:
Block readImpl()
{
if (has_been_read)
@ -27,10 +32,6 @@ public:
return block;
}
String getName() const { return "OneBlockInputStream"; }
BlockInputStreamPtr clone() { return new OneBlockInputStream(block); }
private:
Block block;
bool has_been_read;

View File

@ -42,6 +42,11 @@ public:
aggregator = new Aggregator(key_names, aggregates);
}
String getName() const { return "ParallelAggregatingBlockInputStream"; }
BlockInputStreamPtr clone() { return new ParallelAggregatingBlockInputStream(*this); }
protected:
Block readImpl()
{
if (has_been_read)
@ -51,7 +56,7 @@ public:
ManyAggregatedDataVariants many_data(inputs.size());
Exceptions exceptions(inputs.size());
for (size_t i = 0, size = many_data.size(); i < size; ++i)
{
many_data[i] = new AggregatedDataVariants;
@ -62,15 +67,11 @@ public:
for (size_t i = 0, size = exceptions.size(); i < size; ++i)
if (exceptions[i])
exceptions[i]->rethrow();
AggregatedDataVariantsPtr res = aggregator->merge(many_data);
return aggregator->convertToBlock(*res);
}
String getName() const { return "ParallelAggregatingBlockInputStream"; }
BlockInputStreamPtr clone() { return new ParallelAggregatingBlockInputStream(*this); }
private:
ParallelAggregatingBlockInputStream(const ParallelAggregatingBlockInputStream & src)
: inputs(src.inputs), aggregator(src.aggregator), has_been_read(src.has_been_read) {}

View File

@ -20,12 +20,13 @@ public:
children.push_back(input);
}
Block readImpl();
String getName() const { return "PartialSortingBlockInputStream"; }
BlockInputStreamPtr clone() { return new PartialSortingBlockInputStream(input, description); }
protected:
Block readImpl();
private:
BlockInputStreamPtr input;
SortDescription description;

View File

@ -29,6 +29,11 @@ public:
children.push_back(input);
}
String getName() const { return "ProjectionBlockInputStream"; }
BlockInputStreamPtr clone() { return new ProjectionBlockInputStream(input, expression, without_duplicates_and_aliases, part_id, subtree); }
protected:
Block readImpl()
{
Block res = input->read();
@ -38,10 +43,6 @@ public:
return expression->projectResult(res, without_duplicates_and_aliases, part_id, subtree);
}
String getName() const { return "ProjectionBlockInputStream"; }
BlockInputStreamPtr clone() { return new ProjectionBlockInputStream(input, expression, without_duplicates_and_aliases, part_id, subtree); }
private:
BlockInputStreamPtr input;
ExpressionPtr expression;

View File

@ -23,50 +23,6 @@ public:
}
Block readImpl()
{
if (!sent_query)
{
connection.sendQuery(query, 1, stage);
sent_query = true;
}
while (true)
{
Connection::Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
if (packet.block)
return packet.block;
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
finished = true;
return Block();
case Protocol::Server::Progress:
if (progress_callback)
progress_callback(packet.progress.rows, packet.progress.bytes);
if (!was_cancelled && !finished && isCancelled())
cancel();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
String getName() const { return "RemoteBlockInputStream"; }
BlockInputStreamPtr clone() { return new RemoteBlockInputStream(connection, query, stage); }
@ -143,6 +99,50 @@ public:
}
}
protected:
Block readImpl()
{
if (!sent_query)
{
connection.sendQuery(query, 1, stage);
sent_query = true;
}
while (true)
{
Connection::Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
if (packet.block)
return packet.block;
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
finished = true;
return Block();
case Protocol::Server::Progress:
if (progress_callback)
progress_callback(packet.progress.rows, packet.progress.bytes);
if (!was_cancelled && !finished && isCancelled())
cancel();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
private:
Connection & connection;
const String query;

View File

@ -92,12 +92,30 @@ public:
}
}
String getName() const { return "UnionBlockInputStream"; }
BlockInputStreamPtr clone() { return new UnionBlockInputStream(children, max_threads); }
~UnionBlockInputStream()
{
LOG_TRACE(log, "Waiting for threads to finish");
finish = true;
cancel();
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
LOG_TRACE(log, "Waited for threads to finish");
}
protected:
Block readImpl()
{
OutputData res;
if (finish)
return res.block;
/// Запускаем потоки, если это ещё не было сделано.
if (threads_data.empty())
{
@ -119,23 +137,6 @@ public:
return res.block;
}
String getName() const { return "UnionBlockInputStream"; }
BlockInputStreamPtr clone() { return new UnionBlockInputStream(children, max_threads); }
~UnionBlockInputStream()
{
LOG_TRACE(log, "Waiting for threads to finish");
finish = true;
cancel();
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
LOG_TRACE(log, "Waited for threads to finish");
}
private:
/// Данные отдельного источника
struct InputData

View File

@ -33,9 +33,10 @@ class LogBlockInputStream : public IProfilingBlockInputStream
{
public:
LogBlockInputStream(size_t block_size_, const Names & column_names_, StorageLog & storage_, size_t mark_number_, size_t rows_limit_);
Block readImpl();
String getName() const { return "LogBlockInputStream"; }
BlockInputStreamPtr clone() { return new LogBlockInputStream(block_size, column_names, storage, mark_number, rows_limit); }
protected:
Block readImpl();
private:
size_t block_size;
Names column_names;

View File

@ -14,9 +14,10 @@ class MemoryBlockInputStream : public IProfilingBlockInputStream
{
public:
MemoryBlockInputStream(const Names & column_names_, Blocks::iterator begin_, Blocks::iterator end_);
Block readImpl();
String getName() const { return "MemoryBlockInputStream"; }
BlockInputStreamPtr clone() { return new MemoryBlockInputStream(column_names, begin, end); }
protected:
Block readImpl();
private:
Names column_names;
Blocks::iterator begin;

View File

@ -16,9 +16,10 @@ class NumbersBlockInputStream : public IProfilingBlockInputStream
{
public:
NumbersBlockInputStream(size_t block_size_);
Block readImpl();
String getName() const { return "NumbersBlockInputStream"; }
BlockInputStreamPtr clone() { return new NumbersBlockInputStream(block_size); }
protected:
Block readImpl();
private:
size_t block_size;
UInt64 next;

View File

@ -21,9 +21,10 @@ class TinyLogBlockInputStream : public IProfilingBlockInputStream
{
public:
TinyLogBlockInputStream(size_t block_size_, const Names & column_names_, StorageTinyLog & storage_);
Block readImpl();
String getName() const { return "TinyLogBlockInputStream"; }
BlockInputStreamPtr clone() { return new TinyLogBlockInputStream(block_size, column_names, storage); }
protected:
Block readImpl();
private:
size_t block_size;
Names column_names;

View File

@ -604,6 +604,14 @@ public:
{
}
String getName() const { return "MergeTreeBlockInputStream"; }
BlockInputStreamPtr clone()
{
return new MergeTreeBlockInputStream(path, block_size, column_names, storage, owned_data_part, mark_number, rows_limit);
}
protected:
Block readImpl()
{
Block res;
@ -645,13 +653,6 @@ public:
return res;
}
String getName() const { return "MergeTreeBlockInputStream"; }
BlockInputStreamPtr clone()
{
return new MergeTreeBlockInputStream(path, block_size, column_names, storage, owned_data_part, mark_number, rows_limit);
}
private:
const String path;