dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-03-25 07:52:31 +00:00
parent 5a3649b108
commit 46b8a85e83
5 changed files with 44 additions and 18 deletions

View File

@ -44,6 +44,7 @@ protected:
UInt64 query_id = 0;
readIntBinary(query_id, in);
if (query_id != assert_query_id)
throw Exception("Received data for wrong query id", ErrorCodes::RECEIVED_DATA_FOR_WRONG_QUERY_ID);
@ -55,6 +56,9 @@ protected:
read_in_chunk = std::min(chunk_size, in.buffer().size() - in.offset());
working_buffer = Buffer(in.position(), in.position() + read_in_chunk);
in.position() += read_in_chunk;
if (all_read)
return false;
}
return true;

View File

@ -3,6 +3,8 @@
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteHelpers.h>
#include <iostream>
namespace DB
{
@ -28,12 +30,18 @@ protected:
void nextImpl()
{
std::cerr << out.offset() << std::endl;
std::cerr << query_id << std::endl;
std::cerr << offset() << std::endl;
checkBufferSize();
writeIntBinary(query_id, out);
writeIntBinary(false, out);
writeIntBinary(offset(), out);
std::cerr << out.offset() << std::endl;
out.position() = position();
out.next();
working_buffer = Buffer(out.buffer().begin() + headerSize(), out.buffer().end());

View File

@ -92,7 +92,7 @@ public:
{
size_t bytes_copied = 0;
while (!eof() && bytes_copied < n)
while (bytes_copied < n && !eof())
{
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
std::memcpy(to + bytes_copied, pos, bytes_to_copy);

View File

@ -1,6 +1,7 @@
#include <stdlib.h>
#include <readline/readline.h>
#include <readline/history.h>
#include <iostream>
@ -25,11 +26,11 @@
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/ChunkedReadBuffer.h>
#include <DB/IO/ChunkedWriteBuffer.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/executeQuery.h>
/** Клиент командной строки СУБД ClickHouse.
@ -39,14 +40,15 @@
namespace DB
{
using Poco::SharedPtr;
class Client : public Poco::Util::Application
{
public:
Client() : socket(), in(socket), out(socket), query_id(0), compression(Protocol::Compression::Enable), format_max_block_size(0), std_out(STDOUT_FILENO) {}
private:
using Poco::SharedPtr;
typedef std::tr1::unordered_set<String> StringSet;
StringSet exit_strings;
@ -63,6 +65,7 @@ private:
size_t format_max_block_size; /// Максимальный размер блока при выводе в консоль.
Context context;
Block empty_block;
/// Откуда читать результат выполнения запроса.
SharedPtr<ReadBuffer> chunked_in;
@ -75,7 +78,7 @@ private:
BlockOutputStreamPtr block_out;
/// Вывод в консоль
WriteBuffer std_out;
WriteBufferFromFileDescriptor std_out;
BlockOutputStreamPtr block_std_out;
@ -87,7 +90,8 @@ private:
("exit")("quit")("logout")
("учше")("йгше")("дщпщге")
("exit;")("quit;")("logout;")
("учше;")("йгше;")("дщпщге;");
("учше;")("йгше;")("дщпщге;")
("q")("й");
if (config().has("config-file"))
loadConfiguration(config().getString("config-file"));
@ -102,7 +106,10 @@ private:
int main(const std::vector<std::string> & args)
{
std::cout << "ClickHouse client. Revision: " << Revision::get() << "." << std::endl;
std::cout << "ClickHouse client version " << DBMS_VERSION_MAJOR
<< "." << DBMS_VERSION_MINOR
<< "." << Revision::get()
<< "." << std::endl;
compression = config().getBool("compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
in_format = config().getString("in_format", "Native");
@ -137,7 +144,7 @@ private:
<< " server version " << server_version_major
<< "." << server_version_minor
<< "." << server_revision
<< std::endl;
<< "." << std::endl;
context.format_factory = new FormatFactory();
context.data_type_factory = new DataTypeFactory();
@ -158,6 +165,7 @@ private:
free(line_);
if (!process(line))
break;
add_history(line.c_str());
}
}
@ -174,6 +182,10 @@ private:
sendQuery();
receiveResult();
block_in = NULL;
maybe_compressed_in = NULL;
chunked_in = NULL;
return true;
}
@ -181,9 +193,6 @@ private:
void sendQuery()
{
UInt64 stage = Protocol::QueryProcessingStage::Complete;
UInt64 compression = Protocol::Compression::Enable;
String in_format = "Native";
String out_format = "Native";
writeVarInt(Protocol::Client::Query, out);
writeIntBinary(query_id, out);
@ -201,6 +210,8 @@ private:
{
while (receivePacket())
;
block_std_out = NULL;
}
@ -230,24 +241,23 @@ private:
? new CompressedReadBuffer(*chunked_in)
: chunked_in;
/// Проанализируем запрос
BlockIO analyze_result = executeQuery(query, context);
block_in = context.format_factory->getInput(
out_format,
*maybe_compressed_in,
analyze_result.out_sample,
empty_block,
format_max_block_size,
*context.data_type_factory);
block_std_out = context.format_factory->getOutput(format, std_out, analyze_result.out_sample);
}
/// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а)
/// Прочитать из сети один блок и вывести его в консоль
Block block = block_in->read();
if (block)
{
if (!block_std_out)
block_std_out = context.format_factory->getOutput(format, std_out, block);
block_std_out->write(block);
std_out.next();
return true;
}
else

View File

@ -172,11 +172,15 @@ bool TCPHandler::sendData(WriteBuffer & out, WriteBuffer & out_for_chunks)
if (block)
{
state.block_out->write(block);
state.maybe_compressed_out->next();
state.chunked_out->next();
out_for_chunks.next();
return true;
}
else
{
dynamic_cast<ChunkedWriteBuffer &>(*state.chunked_out).finish();
out_for_chunks.next();
return false;
}
}