dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-05-23 19:51:30 +00:00
parent 039cf5ab06
commit fa7a42842f
8 changed files with 31 additions and 9 deletions

View File

@ -31,8 +31,9 @@ class Connection
public: public:
Connection(const String & host_, UInt16 port_, Connection(const String & host_, UInt16 port_,
DataTypeFactory & data_type_factory_, DataTypeFactory & data_type_factory_,
const String & client_name_ = "client",
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable) Protocol::Compression::Enum compression_ = Protocol::Compression::Enable)
: host(host_), port(port_), connected(false), : host(host_), port(port_), client_name(client_name_), connected(false),
server_version_major(0), server_version_minor(0), server_revision(0), server_version_major(0), server_version_minor(0), server_revision(0),
socket(), in(new ReadBufferFromPocoSocket(socket)), out(new WriteBufferFromPocoSocket(socket)), socket(), in(new ReadBufferFromPocoSocket(socket)), out(new WriteBufferFromPocoSocket(socket)),
query_id(0), compression(compression_), data_type_factory(data_type_factory_) query_id(0), compression(compression_), data_type_factory(data_type_factory_)
@ -71,6 +72,8 @@ private:
String host; String host;
UInt16 port; UInt16 port;
String client_name;
bool connected; bool connected;
String server_name; String server_name;

View File

@ -49,6 +49,12 @@ namespace Protocol
Pong = 4, /// Ответ на Ping. Pong = 4, /// Ответ на Ping.
EndOfStream = 5, /// Все пакеты были переданы. EndOfStream = 5, /// Все пакеты были переданы.
}; };
inline const char * toString(Enum packet)
{
static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream" };
return data[packet];
}
} }
/// То, что передаёт клиент. /// То, что передаёт клиент.
@ -64,6 +70,12 @@ namespace Protocol
Cancel = 3, /// Отменить выполнение запроса. Cancel = 3, /// Отменить выполнение запроса.
Ping = 4, /// Проверка живости соединения с сервером. Ping = 4, /// Проверка живости соединения с сервером.
}; };
inline const char * toString(Enum packet)
{
static const char * data[] = { "Hello", "Query", "Data", "Cancel", "Ping" };
return data[packet];
}
} }
/// Использовать ли сжатие. /// Использовать ли сжатие.

View File

@ -13,6 +13,12 @@ namespace QueryProcessingStage
WithMergeableState = 1, /// До стадии, когда результаты обработки на разных серверах можно объединить. WithMergeableState = 1, /// До стадии, когда результаты обработки на разных серверах можно объединить.
Complete = 2, /// Полностью. Complete = 2, /// Полностью.
}; };
inline const char * toString(Enum stage)
{
static const char * data[] = { "FetchColumns", "WithMergeableState", "Complete" };
return data[stage];
}
} }
} }

View File

@ -66,7 +66,7 @@ public:
private: private:
Connection & connection; Connection & connection;
const String & query; const String query;
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
bool sent_query; bool sent_query;

View File

@ -282,7 +282,7 @@ private:
if (is_interactive) if (is_interactive)
std::cout << "Connecting to " << host << ":" << port << "." << std::endl; std::cout << "Connecting to " << host << ":" << port << "." << std::endl;
connection = new Connection(host, port, *context.data_type_factory, compression); connection = new Connection(host, port, *context.data_type_factory, "client", compression);
if (is_interactive) if (is_interactive)
{ {

View File

@ -31,7 +31,7 @@ void Connection::connect()
void Connection::sendHello() void Connection::sendHello()
{ {
writeVarUInt(Protocol::Client::Hello, *out); writeVarUInt(Protocol::Client::Hello, *out);
writeStringBinary(String(DBMS_NAME) + " client", *out); writeStringBinary((DBMS_NAME " ") + client_name, *out);
writeVarUInt(DBMS_VERSION_MAJOR, *out); writeVarUInt(DBMS_VERSION_MAJOR, *out);
writeVarUInt(DBMS_VERSION_MINOR, *out); writeVarUInt(DBMS_VERSION_MINOR, *out);
writeVarUInt(Revision::get(), *out); writeVarUInt(Revision::get(), *out);
@ -102,7 +102,9 @@ bool Connection::ping()
readVarUInt(pong, *in); readVarUInt(pong, *in);
if (pong != Protocol::Server::Pong) if (pong != Protocol::Server::Pong)
throw Exception("Unknown packet from server (expected Pong)", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); throw Exception("Unexpected packet from server (expected Pong, got "
+ String(Protocol::Server::toString(Protocol::Server::Enum(pong))) + ")",
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
return true; return true;
} }

View File

@ -50,6 +50,7 @@ void TCPHandler::runImpl()
LOG_DEBUG(log, "Query ID: " << state.query_id); LOG_DEBUG(log, "Query ID: " << state.query_id);
LOG_DEBUG(log, "Query: " << state.query); LOG_DEBUG(log, "Query: " << state.query);
LOG_DEBUG(log, "Requested stage: " << QueryProcessingStage::toString(state.stage));
/// Запрос требует приёма данных от клиента? /// Запрос требует приёма данных от клиента?
if (state.io.out) if (state.io.out)

View File

@ -20,7 +20,7 @@ StorageDistributed::StorageDistributed(
data_type_factory(data_type_factory_) data_type_factory(data_type_factory_)
{ {
for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it) for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
connections.push_back(new Connection(it->host().toString(), it->port(), data_type_factory)); connections.push_back(new Connection(it->host().toString(), it->port(), data_type_factory, "server"));
} }
@ -45,8 +45,6 @@ BlockInputStreams StorageDistributed::read(
formatAST(select, s, 0, false); formatAST(select, s, 0, false);
String modified_query = s.str(); String modified_query = s.str();
std::cerr << modified_query << std::endl;
BlockInputStreams res; BlockInputStreams res;
for (Connections::iterator it = connections.begin(); it != connections.end(); ++it) for (Connections::iterator it = connections.begin(); it != connections.end(); ++it)