dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-05-09 15:15:45 +00:00
parent 4217baebdf
commit 65b3d8f761
9 changed files with 135 additions and 24 deletions

View File

@ -8,3 +8,4 @@
#define DEFAULT_MAX_QUERY_SIZE 1048576
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L
#define DEFAULT_MAX_THREADS 8
#define DEFAULT_INTERACTIVE_DELAY 100000

View File

@ -16,7 +16,7 @@ namespace Protocol
Hello = 0, /// Имя, версия, ревизия.
Data = 1, /// Идентификатор запроса, признак последнего чанка, размер чанка, часть данных со сжатием или без.
Exception = 2, /// Исключение во время обработки запроса.
Progress = 3, /// Прогресс выполнения запроса: строк считано, всего строк, байт считано, всего байт.
Progress = 3, /// Прогресс выполнения запроса: строк считано, байт считано.
Ok = 4, /// Запрос без возвращаемого результата успешно выполнен.
Pong = 5, /// Ответ на Ping.
};

View File

@ -32,7 +32,7 @@ struct BlockStreamProfileInfo
BlockStreamProfileInfo() : started(false), rows(0), blocks(0), bytes(0) {}
void update(Block & block);
void update(Block & block, size_t bytes_);
void print(std::ostream & ostr) const;
};
@ -59,9 +59,18 @@ public:
typedef boost::function<bool()> IsCancelledCallback;
void setIsCancelledCallback(IsCancelledCallback callback);
/** Установить колбэк прогресса выполнения.
* Колбэк пробрасывается во все листовые источники и вызывается там после каждого блока.
* Функция принимает количество строк в последнем блоке, количество байт в последнем блоке.
* Следует иметь ввиду, что колбэк может вызываться из разных потоков.
*/
typedef boost::function<void(size_t, size_t)> ProgressCallback;
void setProgressCallback(ProgressCallback callback);
private:
BlockStreamProfileInfo info;
IsCancelledCallback is_cancelled_callback;
ProgressCallback progress_callback;
};
}

View File

@ -14,8 +14,14 @@ struct Settings
size_t max_threads; /// Максимальное количество потоков выполнения запроса
size_t max_query_size; /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже)
bool asynchronous; /// Выполнять разные стадии конвейера выполнения запроса параллельно
size_t interactive_delay; /// Интервал в микросекундах для проверки, не запрошена ли остановка выполнения запроса, и отправки прогресса.
Settings() : max_block_size(DEFAULT_BLOCK_SIZE), max_threads(DEFAULT_MAX_THREADS), max_query_size(DEFAULT_MAX_QUERY_SIZE), asynchronous(true) {}
Settings() :
max_block_size(DEFAULT_BLOCK_SIZE),
max_threads(DEFAULT_MAX_THREADS),
max_query_size(DEFAULT_MAX_QUERY_SIZE),
asynchronous(true),
interactive_delay(DEFAULT_INTERACTIVE_DELAY) {}
};

View File

@ -117,7 +117,7 @@ class Client : public Poco::Util::Application
{
public:
Client() : is_interactive(true), stdin_is_not_tty(false), socket(), in(socket), out(socket), query_id(0), compression(Protocol::Compression::Enable),
format_max_block_size(0), std_in(STDIN_FILENO), std_out(STDOUT_FILENO), received_rows(0) {}
format_max_block_size(0), std_in(STDIN_FILENO), std_out(STDOUT_FILENO), received_rows(0), written_progress_chars(0) {}
private:
typedef std::tr1::unordered_set<String> StringSet;
@ -173,6 +173,8 @@ private:
ASTPtr parsed_query;
bool expect_result; /// Запрос предполагает получение результата.
size_t written_progress_chars;
void initialize(Poco::Util::Application & self)
{
@ -613,6 +615,10 @@ private:
receiveOk();
return false;
case Protocol::Server::Progress:
receiveProgress();
return true;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
@ -636,6 +642,12 @@ private:
*context.data_type_factory);
}
if (written_progress_chars)
{
std::cerr << std::string(written_progress_chars, '\b');
written_progress_chars = 0;
}
/// Прочитать из сети один блок и вывести его в консоль
Block block = block_in->read();
if (block)
@ -685,6 +697,36 @@ private:
if (is_interactive)
std::cout << "Ok." << std::endl;
}
void receiveProgress()
{
size_t rows = 0;
size_t bytes = 0;
readVarUInt(rows, in);
readVarUInt(bytes, in);
static size_t increment = 0;
static const char * indicators[8] =
{
"\033[1;30m.\033[0m",
"\033[1;31m.\033[0m",
"\033[1;32m.\033[0m",
"\033[1;33m.\033[0m",
"\033[1;34m.\033[0m",
"\033[1;35m.\033[0m",
"\033[1;36m.\033[0m",
"\033[1;37m.\033[0m",
};
if (is_interactive)
{
//std::cout << "Progress: " << rows << " rows, " << bytes << " bytes." << std::endl;
std::cerr << indicators[increment % 8];
++increment;
++written_progress_chars;
}
}
void defineOptions(Poco::Util::OptionSet & options)

View File

@ -10,12 +10,11 @@ namespace DB
{
void BlockStreamProfileInfo::update(Block & block)
void BlockStreamProfileInfo::update(Block & block, size_t bytes_)
{
++blocks;
rows += block.rows();
for (size_t i = 0; i < block.columns(); ++i)
bytes += block.getByPosition(i).column->byteSize();
bytes += bytes_;
if (column_names.empty())
column_names = block.dumpNames();
@ -109,9 +108,16 @@ Block IProfilingBlockInputStream::read()
std::cerr << std::endl;
}*/
size_t bytes = 0;
for (size_t i = 0; i < res.columns(); ++i)
bytes += res.getByPosition(i).column->byteSize();
if (res)
info.update(res);
info.update(res, bytes);
if (progress_callback)
progress_callback(res.rows(), bytes);
return res;
}
@ -125,12 +131,19 @@ const BlockStreamProfileInfo & IProfilingBlockInputStream::getInfo() const
void IProfilingBlockInputStream::setIsCancelledCallback(IsCancelledCallback callback)
{
is_cancelled_callback = callback;
BlockInputStreams leaves = getLeaves();
for (BlockInputStreams::iterator it = leaves.begin(); it != leaves.end(); ++it)
if (IProfilingBlockInputStream * leaf = dynamic_cast<IProfilingBlockInputStream *>(&**it))
leaf->setIsCancelledCallback(callback);
leaf->is_cancelled_callback = callback;
}
void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback)
{
BlockInputStreams leaves = getLeaves();
for (BlockInputStreams::iterator it = leaves.begin(); it != leaves.end(); ++it)
if (IProfilingBlockInputStream * leaf = dynamic_cast<IProfilingBlockInputStream *>(&**it))
leaf->progress_callback = callback;
}

View File

@ -84,6 +84,7 @@ int Server::main(const std::vector<std::string> & args)
global_context.settings.max_block_size = config.getInt("max_block_size", global_context.settings.max_block_size);
global_context.settings.max_query_size = config.getInt("max_query_size", global_context.settings.max_query_size);
global_context.settings.max_threads = config.getInt("max_threads", global_context.settings.max_threads);
global_context.settings.interactive_delay = config.getInt("interactive_delay", global_context.settings.interactive_delay);
Poco::Net::ServerSocket http_socket(Poco::Net::SocketAddress("[::]:" + config.getString("http_port")));
Poco::Net::ServerSocket tcp_socket(Poco::Net::SocketAddress("[::]:" + config.getString("tcp_port")));

View File

@ -8,8 +8,6 @@
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBufferFromPocoSocket.h>
#include <DB/IO/WriteBufferFromPocoSocket.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/copyData.h>
@ -39,17 +37,18 @@ void TCPHandler::runImpl()
state.reset();
try
{
{
/// Пакет с запросом.
receivePacket(in, out);
after_check_cancelled.restart();
after_send_progress.restart();
LOG_DEBUG(log, "Query ID: " << state.query_id);
LOG_DEBUG(log, "Query: " << state.query);
LOG_DEBUG(log, "In format: " << state.in_format);
LOG_DEBUG(log, "Out format: " << state.out_format);
state.exception = NULL;
/// Читаем из сети данные для INSERT-а, если надо, и вставляем их.
if (state.io.out)
{
@ -61,8 +60,11 @@ void TCPHandler::runImpl()
if (state.io.in)
{
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*state.io.in))
{
profiling_in->setIsCancelledCallback(boost::bind(&TCPHandler::isQueryCancelled, this, boost::ref(in)));
profiling_in->setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, boost::ref(out), 0, 0));
}
while (sendData(out, out_for_chunks))
;
}
@ -197,12 +199,17 @@ bool TCPHandler::receiveData(ReadBuffer & in)
bool TCPHandler::isQueryCancelled(ReadBufferFromPocoSocket & in)
{
Poco::ScopedLock<Poco::FastMutex> lock(is_cancelled_mutex);
if (after_check_cancelled.elapsed() / 1000 < state.context.settings.interactive_delay)
return false;
after_check_cancelled.restart();
/// Во время выполнения запроса, единственный пакет, который может прийти от клиента - это остановка выполнения запроса.
std::cerr << "checking cancelled" << std::endl;
if (in.poll(0))
{
Poco::ScopedLock<Poco::FastMutex> lock(is_cancelled_mutex);
std::cerr << "checking cancelled; socket has data" << std::endl;
UInt64 packet_type = 0;
@ -228,6 +235,8 @@ bool TCPHandler::sendData(WriteBuffer & out, WriteBuffer & out_for_chunks)
{
/// Получить один блок результата выполнения запроса из state.io.in.
Block block = state.io.in->read();
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
writeVarUInt(Protocol::Server::Data, out);
out.next();
@ -260,12 +269,15 @@ bool TCPHandler::sendData(WriteBuffer & out, WriteBuffer & out_for_chunks)
{
dynamic_cast<ChunkedWriteBuffer &>(*state.chunked_out).finish();
out_for_chunks.next();
state.sent_all_data = true;
return false;
}
}
void TCPHandler::sendException(WriteBuffer & out)
{
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
writeVarUInt(Protocol::Server::Exception, out);
writeException(*state.exception, out);
out.next();
@ -273,13 +285,29 @@ void TCPHandler::sendException(WriteBuffer & out)
void TCPHandler::sendOk(WriteBuffer & out)
{
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
writeVarUInt(Protocol::Server::Ok, out);
out.next();
}
void TCPHandler::sendProgress(WriteBuffer & out)
void TCPHandler::sendProgress(WriteBuffer & out, size_t rows, size_t bytes)
{
/// TODO
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
/// Не будем отправлять прогресс после того, как отправлены все данные.
if (state.sent_all_data)
return;
if (after_send_progress.elapsed() / 1000 < state.context.settings.interactive_delay)
return;
after_send_progress.restart();
writeVarUInt(Protocol::Server::Progress, out);
writeVarUInt(rows, out);
writeVarUInt(bytes, out);
out.next();
}

View File

@ -8,9 +8,12 @@
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadBufferFromPocoSocket.h>
#include <DB/IO/WriteBufferFromPocoSocket.h>
#include <DB/DataStreams/BlockIO.h>
#include <statdaemons/Stopwatch.h>
#include "Server.h"
@ -50,9 +53,11 @@ struct QueryState
* Клиент сможет его принять, если оно не произошло во время отправки другого пакета.
*/
SharedPtr<Exception> exception;
QueryState() : query_id(0), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable) {}
/// Данные были отправлены.
bool sent_all_data;
QueryState() : query_id(0), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable), sent_all_data(false) {}
void reset()
{
@ -86,6 +91,12 @@ private:
QueryState state;
Poco::FastMutex is_cancelled_mutex;
/// Для сериализации пакетов "данные" и "прогресс" (пакет типа "прогресс" может отправляться из другого потока).
Poco::FastMutex send_mutex;
/// Время после последней проверки остановки запроса и отправки прогресса.
Stopwatch after_check_cancelled;
Stopwatch after_send_progress;
void runImpl();
@ -93,7 +104,7 @@ private:
void sendHello(WriteBuffer & out);
bool sendData(WriteBuffer & out, WriteBuffer & out_for_chunks);
void sendException(WriteBuffer & out);
void sendProgress(WriteBuffer & out);
void sendProgress(WriteBuffer & out, size_t rows, size_t bytes);
void sendOk(WriteBuffer & out);
bool receivePacket(ReadBuffer & in, WriteBuffer & out);