From 0ded5c5040c2de7c66ea1ea0ed5945dc42112138 Mon Sep 17 00:00:00 2001 From: Sergey Fedorov Date: Tue, 8 Apr 2014 17:43:20 +0400 Subject: [PATCH] dbms: sending external table throw http [METR-10071] --- dbms/include/DB/Common/ExternalTable.h | 239 +++++++++++++++++++++++++ dbms/src/Client/Client.cpp | 125 +------------ dbms/src/Server/HTTPHandler.cpp | 49 +++-- dbms/src/Server/HTTPHandler.h | 1 + dbms/src/Server/TCPHandler.cpp | 2 + 5 files changed, 278 insertions(+), 138 deletions(-) create mode 100644 dbms/include/DB/Common/ExternalTable.h diff --git a/dbms/include/DB/Common/ExternalTable.h b/dbms/include/DB/Common/ExternalTable.h new file mode 100644 index 00000000000..b6a5f0f874a --- /dev/null +++ b/dbms/include/DB/Common/ExternalTable.h @@ -0,0 +1,239 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/// Базовый класс содержащий основную информацию о внешней таблице и +/// основные функции для извлечения этой информации из текстовых полей. +class BaseExternalTable +{ +public: + std::string file; /// Файл с данными или '-' если stdin + std::string name; /// Имя таблицы + std::string format; /// Название формата хранения данных + + /// Описание структуры таблицы: (имя столбца, имя типа данных) + std::vector > structure; + + ReadBuffer *read_buffer; + Block sample_block; + + virtual ~BaseExternalTable() {}; + + /// Инициализировать read_buffer в зависимости от источника данных. По умолчанию не делает ничего. + virtual void initReadBuffer() {}; + + /// Инициализировать sample_block по структуре таблицы сохраненной в structure + virtual void initSampleBlock(const Context &context) + { + for (size_t i = 0; i < structure.size(); ++i) + { + ColumnWithNameAndType column; + column.name = structure[i].first; + column.type = context.getDataTypeFactory().get(structure[i].second); + column.column = column.type->createColumn(); + sample_block.insert(column); + } + } + + /// Получить данные таблицы - пару (поток с содержимым таблицы, имя таблицы) + virtual ExternalTableData getData(const Context &context) + { + initReadBuffer(); + initSampleBlock(context); + ExternalTableData res = std::make_pair(new AsynchronousBlockInputStream(context.getFormatFactory().getInput( + format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE, context.getDataTypeFactory())), name); + return res; + } + +protected: + /// Очистить всю накопленную информацию + void clean() + { + name = ""; + file = ""; + format = ""; + structure.clear(); + sample_block = Block(); + read_buffer = NULL; + } + + /// Функция для отладочного вывода информации + virtual void write() + { + std::cerr << "file " << file << std::endl; + std::cerr << "name " << name << std::endl; + std::cerr << "format " << format << std::endl; + std::cerr << "structure: \n"; + for (size_t i = 0; i < structure.size(); ++i) + std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl; + } + + static std::vector split(const std::string & s, const std::string &d) + { + std::vector res; + std::string now; + for (size_t i = 0; i < s.size(); ++i) + { + if (d.find(s[i]) != std::string::npos) + { + if (!now.empty()) + res.push_back(now); + now = ""; + continue; + } + now += s[i]; + } + if (!now.empty()) + res.push_back(now); + return res; + } + + /// Построить вектор structure по текстовому полю structure + virtual void parseStructureFromStructureField(const std::string & argument) + { + std::vector vals = split(argument, " ,"); + + if (vals.size() & 1) + throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS); + + for (size_t i = 0; i < vals.size(); i += 2) + structure.push_back(std::make_pair(vals[i], vals[i+1])); + } + + /// Построить вектор structure по текстовому полю types + virtual void parseStructureFromTypesField(const std::string & argument) + { + std::vector vals = split(argument, " ,"); + + for (size_t i = 0; i < vals.size(); ++i) + structure.push_back(std::make_pair("_" + toString(i + 1), vals[i])); + } +}; + + +/// Парсинг внешей таблицы, используемый в tcp клиенте. +class ExternalTable : public BaseExternalTable +{ +public: + void initReadBuffer() + { + if (file == "-") + read_buffer = new ReadBufferFromIStream(std::cin); + else + read_buffer = new ReadBufferFromFile(file); + } + + /// Извлечение параметров из variables_map, которая строится по командной строке клиента + ExternalTable(const boost::program_options::variables_map & external_options) + { + if (external_options.count("file")) + file = external_options["file"].as(); + else + throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); + + if (external_options.count("name")) + name = external_options["name"].as(); + else + throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); + + if (external_options.count("format")) + format = external_options["format"].as(); + else + throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); + + if (external_options.count("structure")) + { + std::vector temp = external_options["structure"].as>(); + + std::string argument; + for (size_t i = 0; i < temp.size(); ++i) + argument = argument + temp[i] + " "; + + parseStructureFromStructureField(argument); + + } + else if (external_options.count("types")) + { + std::vector temp = external_options["types"].as>(); + std::string argument; + for (size_t i = 0; i < temp.size(); ++i) + argument = argument + temp[i] + " "; + parseStructureFromTypesField(argument); + } + else + throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); + } +}; + +/// Парсинг внешей таблицы, используемый при отправке таблиц через http +/// Функция handlePart будет вызываться для каждой переданной таблицы, +/// поэтому так же необходимо вызывать clean в конце handlePart. +class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable +{ +public: + std::vector names; + + ExternalTablesHandler(Context & context_, Poco::Net::NameValueCollection params_) : context(context_), params(params_) { } + + void handlePart(const Poco::Net::MessageHeader& header, std::istream& stream) + { + /// Буфер инициализируется здесь, а не в виртуальной функции initReadBuffer + read_buffer = new ReadBufferFromIStream(stream); + + /// Извлекаем коллекцию параметров из MessageHeader + Poco::Net::NameValueCollection content; + std::string label; + Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content); + + /// Получаем параметры + name = content.get("name", "_data"); + format = params.get("format" + name, "TabSeparated"); + + if (params.has("structure" + name)) + parseStructureFromStructureField(params.get("structure" + name)); + else if (params.has("types" + name)) + parseStructureFromTypesField(params.get("types" + name)); + else + throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields structure" + name + " or types" + name + " to do so.", ErrorCodes::BAD_ARGUMENTS); + + ExternalTableData data = getData(context); + + /// Создаем таблицу + NamesAndTypesListPtr columns = new NamesAndTypesList(sample_block.getColumnsList()); + StoragePtr storage = StorageMemory::create(data.second, columns); + context.addExternalTable(data.second, storage); + BlockOutputStreamPtr output = storage->write(ASTPtr()); + + /// Записываем данные + data.first->readPrefix(); + output->writePrefix(); + while(Block block = data.first->read()) + output->write(block); + data.first->readSuffix(); + output->writeSuffix(); + + names.push_back(name); + /// Подготавливаемся к приему следующего файла, для этого очищаем всю полученную информацию + clean(); + } + +private: + Context & context; + Poco::Net::NameValueCollection params; +}; + + +} diff --git a/dbms/src/Client/Client.cpp b/dbms/src/Client/Client.cpp index 98c4cd224cf..542149bd54c 100644 --- a/dbms/src/Client/Client.cpp +++ b/dbms/src/Client/Client.cpp @@ -48,6 +48,7 @@ #include "InterruptListener.h" +#include /** Клиент командной строки СУБД ClickHouse. */ @@ -58,130 +59,6 @@ namespace DB using Poco::SharedPtr; -/// Описание внешней таблицы -class ExternalTable -{ -public: - std::string file; /// Файл с данными или '-' если stdin - std::string name; /// Имя таблицы - std::string format; /// Название формата хранения данных - - /// Описание структуры таблицы: (имя столбца, имя типа данных) - std::vector > structure; - - ReadBuffer *read_buffer; - Block sample_block; - - void initReadBuffer() - { - if (file == "-") - read_buffer = new ReadBufferFromIStream(std::cin); - else - read_buffer = new ReadBufferFromFile(file); - } - - void initSampleBlock(const Context &context) - { - for (size_t i = 0; i < structure.size(); ++i) - { - ColumnWithNameAndType column; - column.name = structure[i].first; - column.type = context.getDataTypeFactory().get(structure[i].second); - column.column = column.type->createColumn(); - sample_block.insert(column); - } - } - - ExternalTableData getData(const Context &context) - { - initReadBuffer(); - initSampleBlock(context); - ExternalTableData res = std::make_pair(new AsynchronousBlockInputStream(context.getFormatFactory().getInput( - format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE, context.getDataTypeFactory())), name); - return res; - } - - /// Функция для отладочного вывода информации - void write() - { - std::cerr << "file " << file << std::endl; - std::cerr << "name " << name << std::endl; - std::cerr << "format " << format << std::endl; - std::cerr << "structure: \n"; - for (size_t i = 0; i < structure.size(); ++i) - std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl; - } - - /// Извлечение параметров из variables_map, которая строится по командной строке - ExternalTable(const boost::program_options::variables_map & external_options) - { - if (external_options.count("file")) - file = external_options["file"].as(); - else - throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); - - if (external_options.count("name")) - name = external_options["name"].as(); - else - throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); - - if (external_options.count("format")) - format = external_options["format"].as(); - else - throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); - - if (external_options.count("structure")) - { - std::vector temp = external_options["structure"].as>(); - - std::string argument; - for (size_t i = 0; i < temp.size(); ++i) - argument = argument + temp[i] + " "; - std::vector vals = split(argument, " ,"); - - if (vals.size() & 1) - throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS); - - for (size_t i = 0; i < vals.size(); i += 2) - structure.push_back(std::make_pair(vals[i], vals[i+1])); - } - else if (external_options.count("types")) - { - std::vector temp = external_options["types"].as>(); - std::string argument; - for (size_t i = 0; i < temp.size(); ++i) - argument = argument + temp[i] + " "; - std::vector vals = split(argument, " ,"); - - for (size_t i = 0; i < vals.size(); ++i) - structure.push_back(std::make_pair("_" + toString(i + 1), vals[i])); - } - else - throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); - } - - static std::vector split(const std::string & s, const std::string &d) - { - std::vector res; - std::string now; - for (size_t i = 0; i < s.size(); ++i) - { - if (d.find(s[i]) != std::string::npos) - { - if (!now.empty()) - res.push_back(now); - now = ""; - continue; - } - now += s[i]; - } - if (!now.empty()) - res.push_back(now); - return res; - } -}; - - class Client : public Poco::Util::Application { public: diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index 70b3976dcf3..c080f058559 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -19,6 +19,8 @@ #include +#include + #include "HTTPHandler.h" @@ -26,7 +28,6 @@ namespace DB { - void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, String query_id) { LOG_TRACE(log, "Request URI: " << request.getURI()); @@ -45,18 +46,6 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net if (!query_param.empty()) query_param += '\n'; - ReadBufferFromString in_param(query_param); - SharedPtr in_post = new ReadBufferFromIStream(istr); - SharedPtr in_post_maybe_compressed; - - /// Если указано decompress, то будем разжимать то, что передано POST-ом. - if (parse(params.get("decompress", "0"))) - in_post_maybe_compressed = new CompressedReadBuffer(*in_post); - else - in_post_maybe_compressed = in_post; - - ConcatReadBuffer in(in_param, *in_post_maybe_compressed); - /// Если указано compress, то будем сжимать результат. SharedPtr out = new WriteBufferFromHTTPServerResponse(response); SharedPtr out_maybe_compressed; @@ -86,6 +75,38 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net context.setUser(user, password, request.clientAddress().host(), quota_key); context.setCurrentQueryId(query_id); + SharedPtr in_param = new ReadBufferFromString(query_param); + SharedPtr in_post = new ReadBufferFromIStream(istr); + SharedPtr in_post_maybe_compressed; + + /// Если указано decompress, то будем разжимать то, что передано POST-ом. + if (parse(params.get("decompress", "0"))) + in_post_maybe_compressed = new CompressedReadBuffer(*in_post); + else + in_post_maybe_compressed = in_post; + + SharedPtr in; + std::string content_type = request.getContentType(); + if (content_type.length() > strlen("multipart/form-data")) + content_type.resize(strlen("multipart/form-data")); + if (strcmp(content_type.data(), "multipart/form-data") == 0) + { + in = in_param; + ExternalTablesHandler handler(context, params); + + params.load(request, istr, handler); + + /// Удаляем уже нененужные параметры из хранилища, чтобы впоследствии не перепутать их с натройками контекста и параметрами запроса. + for (const auto & it : handler.names) + { + params.erase("format" + it); + params.erase("types" + it); + params.erase("structure" + it); + } + } + else + in = new ConcatReadBuffer(*in_param, *in_post_maybe_compressed); + /// Настройки могут быть переопределены в запросе. for (Poco::Net::NameValueCollection::ConstIterator it = params.begin(); it != params.end(); ++it) { @@ -116,7 +137,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net context.getSettingsRef().limits.readonly = true; Stopwatch watch; - executeQuery(in, *out_maybe_compressed, context, query_plan); + executeQuery(*in, *out_maybe_compressed, context, query_plan); watch.stop(); if (query_plan) diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index bf5c357f131..2b7098daebd 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -6,6 +6,7 @@ namespace DB { + class HTTPHandler : public Poco::Net::HTTPRequestHandler { public: diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 48e1ebdf372..76c0fdf46e4 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -23,6 +23,8 @@ #include +#include + #include "TCPHandler.h"