client: external data sending from client to server [METR-10071]

This commit is contained in:
Sergey Fedorov 2014-03-06 18:02:20 +04:00
parent 4ffc3fbdf7
commit a17a57c425
5 changed files with 78 additions and 14 deletions

View File

@ -24,6 +24,7 @@ namespace DB
using Poco::SharedPtr;
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/** Соединение с сервером БД для использования в клиенте.
* Как использовать - см. Core/Protocol.h
@ -83,8 +84,8 @@ public:
const Settings * settings = NULL);
void sendCancel();
void sendData(const Block & block);
void sendTemporaryTables();
void sendData(const Block & block, const String & name = "");
void sendExternalTables(std::vector<ExternalTableData> & data);
/// Проверить, если ли данные, которые можно прочитать.
bool poll(size_t timeout_microseconds = 0);

View File

@ -65,6 +65,37 @@ public:
std::string name;
std::string format;
std::vector<std::pair<std::string, std::string> > structure;
ReadBuffer *read_buffer;
Block sample_block;
void initReadBuffer()
{
/// stdin
if (file == "-")
throw Exception("stdin as file is not supported yet", ErrorCodes::BAD_ARGUMENTS);
read_buffer = new ReadBufferFromFile(file);
}
void initSampleBlock(const Context &context)
{
for (size_t i = 0; i < structure.size(); ++i)
{
ColumnWithNameAndType column;
column.name = structure[i].first;
column.type = context.getDataTypeFactory().get(structure[i].second);
column.column = column.type->createColumn();
sample_block.insert(column);
}
}
ExternalTableData getData(const Context &context)
{
initReadBuffer();
initSampleBlock(context);
ExternalTableData res = std::make_pair(new AsynchronousBlockInputStream(context.getFormatFactory().getInput(
format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE, context.getDataTypeFactory())), name);
return res;
}
void write()
{
@ -519,11 +550,20 @@ private:
}
void sendExternalTables()
{
std::vector<ExternalTableData> data;
for (size_t i = 0; i < external_tables.size(); ++i)
data.push_back(external_tables[i].getData(context));
connection->sendExternalTables(data);
}
/// Обработать запрос, который не требует передачи блоков данных на сервер.
void processOrdinaryQuery()
{
connection->sendQuery(query, "", QueryProcessingStage::Complete);
connection->sendTemporaryTables();
sendExternalTables();
receiveResult();
}
@ -541,7 +581,7 @@ private:
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(query_without_data, "", QueryProcessingStage::Complete);
connection->sendTemporaryTables();
sendExternalTables();
/// Получим структуру таблицы
Block sample = receiveSampleBlock();

View File

@ -243,7 +243,7 @@ void Connection::sendCancel()
}
void Connection::sendData(const Block & block)
void Connection::sendData(const Block & block, const String & name)
{
//LOG_TRACE(log, "Sending data (" << getServerAddress() << ")");
@ -260,7 +260,7 @@ void Connection::sendData(const Block & block)
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPRORY_TABLES)
writeStringBinary("", *out);
writeStringBinary(name, *out);
block.checkNestedArraysOffsets();
block_out->write(block);
@ -268,13 +268,20 @@ void Connection::sendData(const Block & block)
out->next();
}
void Connection::sendTemporaryTables()
void Connection::sendExternalTables(std::vector<ExternalTableData> & data)
{
/// Если работаем со старым сервером, то никакой информации не отправляем
if (server_revision < DBMS_MIN_REVISION_WITH_TEMPRORY_TABLES)
return;
for (size_t i = 0; i < data.size(); ++i)
{
data[i].first->readPrefix();
while(Block block = data[i].first->read())
sendData(block, data[i].second);
data[i].first->readSuffix();
}
/// Отправляем пустой блок, символизируя конец передачи данных
sendData(Block());
}

View File

@ -164,6 +164,7 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const
StoragePtr Context::tryGetTemporaryTable(const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
std::cerr << temporary_tables.size() << " " << table_name << std::endl;
Tables::const_iterator jt;
if (temporary_tables.end() == (jt = temporary_tables.find(table_name)))
@ -206,6 +207,16 @@ StoragePtr Context::tryGetTable(const String & database_name, const String & tab
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (database_name.empty())
{
StoragePtr res;
if (res = tryGetTemporaryTable(table_name))
return res;
if (res = session_context->tryGetTemporaryTable(table_name))
return res;
if (res = global_context->tryGetTemporaryTable(table_name))
return res;
}
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it;

View File

@ -126,18 +126,23 @@ void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, St
/** Если таблица не указана - используем таблицу system.one.
* Если база данных не указана - используем текущую базу данных.
*/
if (query.database)
database_name = dynamic_cast<ASTIdentifier &>(*query.database).name;
if (query.table)
table_name = dynamic_cast<ASTIdentifier &>(*query.table).name;
if (!query.table)
{
database_name = "system";
table_name = "one";
}
else if (!query.database)
database_name = context.getCurrentDatabase();
if (query.database)
database_name = dynamic_cast<ASTIdentifier &>(*query.database).name;
if (query.table)
table_name = dynamic_cast<ASTIdentifier &>(*query.table).name;
{
if (context.tryGetTable("", table_name))
database_name = "";
else
database_name = context.getCurrentDatabase();
}
}