Moved file to appropriate place; split to cpp #2482

This commit is contained in:
Alexey Milovidov 2018-08-20 05:34:00 +03:00
parent f1ba2f9a33
commit e0f1637506
6 changed files with 300 additions and 239 deletions

View File

@ -20,7 +20,6 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ShellCommand.h>
#include <Common/ExternalTable.h>
#include <Common/UnicodeBar.h>
#include <Common/formatReadable.h>
#include <Common/NetException.h>
@ -31,6 +30,7 @@
#include <Common/config_version.h>
#include <Core/Types.h>
#include <Core/QueryProcessingStage.h>
#include <Core/ExternalTable.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFile.h>

View File

@ -9,7 +9,7 @@
#include <ext/scope_guard.h>
#include <Common/ExternalTable.h>
#include <Core/ExternalTable.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/getFQDNOrHostName.h>

View File

@ -6,6 +6,10 @@
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
#include <Common/config_version.h>
#include <IO/Progress.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
@ -22,11 +26,7 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
#include <Common/ExternalTable.h>
#include <Common/NetException.h>
#include <Common/config_version.h>
#include <Core/ExternalTable.h>
#include "TCPHandler.h"

View File

@ -1,232 +0,0 @@
#pragma once
#include <boost/program_options.hpp>
#include <boost/algorithm/string.hpp>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
#include <Storages/StorageMemory.h>
#include <Client/Connection.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/Net/PartHandler.h>
#include <Poco/Net/MessageHeader.h>
#include <Common/HTMLForm.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/// The base class containing the basic information about external table and
/// basic functions for extracting this information from text fields.
class BaseExternalTable
{
public:
std::string file; /// File with data or '-' if stdin
std::string name; /// The name of the table
std::string format; /// Name of the data storage format
/// Description of the table structure: (column name, data type name)
std::vector<std::pair<std::string, std::string>> structure;
std::unique_ptr<ReadBuffer> read_buffer;
Block sample_block;
virtual ~BaseExternalTable() {}
/// Initialize read_buffer, depending on the data source. By default, does nothing.
virtual void initReadBuffer() {}
/// Get the table data - a pair (a stream with the contents of the table, the name of the table)
ExternalTableData getData(const Context & context)
{
initReadBuffer();
initSampleBlock();
auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name);
}
protected:
/// Clear all accumulated information
void clean()
{
name = "";
file = "";
format = "";
structure.clear();
sample_block = Block();
read_buffer.reset();
}
/// Function for debugging information output
void write()
{
std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl;
std::cerr << "format " << format << std::endl;
std::cerr << "structure: \n";
for (size_t i = 0; i < structure.size(); ++i)
std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl;
}
static std::vector<std::string> split(const std::string & s, const std::string & d)
{
std::vector<std::string> res;
boost::split(res, s, boost::algorithm::is_any_of(d), boost::algorithm::token_compress_on);
return res;
}
/// Construct the `structure` vector from the text field `structure`
virtual void parseStructureFromStructureField(const std::string & argument)
{
std::vector<std::string> vals = split(argument, " ,");
if (vals.size() & 1)
throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS);
for (size_t i = 0; i < vals.size(); i += 2)
structure.emplace_back(vals[i], vals[i + 1]);
}
/// Construct the `structure` vector from the text field `types`
virtual void parseStructureFromTypesField(const std::string & argument)
{
std::vector<std::string> vals = split(argument, " ,");
for (size_t i = 0; i < vals.size(); ++i)
structure.emplace_back("_" + toString(i + 1), vals[i]);
}
private:
/// Initialize sample_block according to the structure of the table stored in the `structure`
void initSampleBlock()
{
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
for (size_t i = 0; i < structure.size(); ++i)
{
ColumnWithTypeAndName column;
column.name = structure[i].first;
column.type = data_type_factory.get(structure[i].second);
column.column = column.type->createColumn();
sample_block.insert(std::move(column));
}
}
};
/// Parsing of external table used in the tcp client.
class ExternalTable : public BaseExternalTable
{
public:
void initReadBuffer() override
{
if (file == "-")
read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
else
read_buffer = std::make_unique<ReadBufferFromFile>(file);
}
/// Extract parameters from variables_map, which is built on the client command line
ExternalTable(const boost::program_options::variables_map & external_options)
{
if (external_options.count("file"))
file = external_options["file"].as<std::string>();
else
throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("name"))
name = external_options["name"].as<std::string>();
else
throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("format"))
format = external_options["format"].as<std::string>();
else
throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("structure"))
parseStructureFromStructureField(external_options["structure"].as<std::string>());
else if (external_options.count("types"))
parseStructureFromTypesField(external_options["types"].as<std::string>());
else
throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
}
};
/// Parsing of external table used when sending tables via http
/// The `handlePart` function will be called for each table passed,
/// so it's also necessary to call `clean` at the end of the `handlePart`.
class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable
{
public:
ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) { }
void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream)
{
const Settings & settings = context.getSettingsRef();
/// The buffer is initialized here, not in the virtual function initReadBuffer
read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream);
if (settings.http_max_multipart_form_data_size)
read_buffer = std::make_unique<LimitReadBuffer>(
*read_buffer_impl, settings.http_max_multipart_form_data_size,
true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
else
read_buffer = std::move(read_buffer_impl);
/// Retrieve a collection of parameters from MessageHeader
Poco::Net::NameValueCollection content;
std::string label;
Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);
/// Get parameters
name = content.get("name", "_data");
format = params.get(name + "_format", "TabSeparated");
if (params.has(name + "_structure"))
parseStructureFromStructureField(params.get(name + "_structure"));
else if (params.has(name + "_types"))
parseStructureFromTypesField(params.get(name + "_types"));
else
throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);
ExternalTableData data = getData(context);
/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create(data.second, ColumnsDescription{columns});
storage->startup();
context.addExternalTable(data.second, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), settings);
/// Write data
data.first->readPrefix();
output->writePrefix();
while(Block block = data.first->read())
output->write(block);
data.first->readSuffix();
output->writeSuffix();
/// We are ready to receive the next file, for this we clear all the information received
clean();
}
private:
Context & context;
const Poco::Net::NameValueCollection & params;
std::unique_ptr<ReadBufferFromIStream> read_buffer_impl;
};
}

View File

@ -0,0 +1,182 @@
#include <boost/program_options.hpp>
#include <boost/algorithm/string.hpp>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <IO/copyData.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
#include <Storages/StorageMemory.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/Net/MessageHeader.h>
#include <Common/HTMLForm.h>
#include <Core/ExternalTable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
ExternalTableData BaseExternalTable::getData(const Context & context)
{
initReadBuffer();
initSampleBlock();
auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name);
}
void BaseExternalTable::clean()
{
name = "";
file = "";
format = "";
structure.clear();
sample_block = Block();
read_buffer.reset();
}
/// Function for debugging information output
void BaseExternalTable::write()
{
std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl;
std::cerr << "format " << format << std::endl;
std::cerr << "structure: \n";
for (size_t i = 0; i < structure.size(); ++i)
std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl;
}
std::vector<std::string> BaseExternalTable::split(const std::string & s, const std::string & d)
{
std::vector<std::string> res;
boost::split(res, s, boost::algorithm::is_any_of(d), boost::algorithm::token_compress_on);
return res;
}
void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
{
std::vector<std::string> vals = split(argument, " ,");
if (vals.size() & 1)
throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS);
for (size_t i = 0; i < vals.size(); i += 2)
structure.emplace_back(vals[i], vals[i + 1]);
}
void BaseExternalTable::parseStructureFromTypesField(const std::string & argument)
{
std::vector<std::string> vals = split(argument, " ,");
for (size_t i = 0; i < vals.size(); ++i)
structure.emplace_back("_" + toString(i + 1), vals[i]);
}
void BaseExternalTable::initSampleBlock()
{
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
for (size_t i = 0; i < structure.size(); ++i)
{
ColumnWithTypeAndName column;
column.name = structure[i].first;
column.type = data_type_factory.get(structure[i].second);
column.column = column.type->createColumn();
sample_block.insert(std::move(column));
}
}
void ExternalTable::initReadBuffer()
{
if (file == "-")
read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
else
read_buffer = std::make_unique<ReadBufferFromFile>(file);
}
ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options)
{
if (external_options.count("file"))
file = external_options["file"].as<std::string>();
else
throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("name"))
name = external_options["name"].as<std::string>();
else
throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("format"))
format = external_options["format"].as<std::string>();
else
throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
if (external_options.count("structure"))
parseStructureFromStructureField(external_options["structure"].as<std::string>());
else if (external_options.count("types"))
parseStructureFromTypesField(external_options["types"].as<std::string>());
else
throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
}
void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, std::istream & stream)
{
const Settings & settings = context.getSettingsRef();
/// The buffer is initialized here, not in the virtual function initReadBuffer
read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream);
if (settings.http_max_multipart_form_data_size)
read_buffer = std::make_unique<LimitReadBuffer>(
*read_buffer_impl, settings.http_max_multipart_form_data_size,
true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
else
read_buffer = std::move(read_buffer_impl);
/// Retrieve a collection of parameters from MessageHeader
Poco::Net::NameValueCollection content;
std::string label;
Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);
/// Get parameters
name = content.get("name", "_data");
format = params.get(name + "_format", "TabSeparated");
if (params.has(name + "_structure"))
parseStructureFromStructureField(params.get(name + "_structure"));
else if (params.has(name + "_types"))
parseStructureFromTypesField(params.get(name + "_types"));
else
throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);
ExternalTableData data = getData(context);
/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
StoragePtr storage = StorageMemory::create(data.second, ColumnsDescription{columns});
storage->startup();
context.addExternalTable(data.second, storage);
BlockOutputStreamPtr output = storage->write(ASTPtr(), settings);
/// Write data
data.first->readPrefix();
output->writePrefix();
while(Block block = data.first->read())
output->write(block);
data.first->readSuffix();
output->writeSuffix();
/// We are ready to receive the next file, for this we clear all the information received
clean();
}
}

View File

@ -0,0 +1,111 @@
#pragma once
#include <string>
#include <vector>
#include <memory>
#include <iosfwd>
#include <Poco/Net/PartHandler.h>
#include <Core/Block.h>
#include <Client/Connection.h>
#include <IO/ReadBuffer.h>
namespace Poco
{
namespace Net
{
class NameValueCollection;
class MessageHeader;
}
}
namespace boost
{
namespace program_options
{
class variables_map;
}
}
namespace DB
{
class Context;
/// The base class containing the basic information about external table and
/// basic functions for extracting this information from text fields.
class BaseExternalTable
{
public:
std::string file; /// File with data or '-' if stdin
std::string name; /// The name of the table
std::string format; /// Name of the data storage format
/// Description of the table structure: (column name, data type name)
std::vector<std::pair<std::string, std::string>> structure;
std::unique_ptr<ReadBuffer> read_buffer;
Block sample_block;
virtual ~BaseExternalTable() {}
/// Initialize read_buffer, depending on the data source. By default, does nothing.
virtual void initReadBuffer() {}
/// Get the table data - a pair (a stream with the contents of the table, the name of the table)
ExternalTableData getData(const Context & context);
protected:
/// Clear all accumulated information
void clean();
/// Function for debugging information output
void write();
static std::vector<std::string> split(const std::string & s, const std::string & d);
/// Construct the `structure` vector from the text field `structure`
virtual void parseStructureFromStructureField(const std::string & argument);
/// Construct the `structure` vector from the text field `types`
virtual void parseStructureFromTypesField(const std::string & argument);
private:
/// Initialize sample_block according to the structure of the table stored in the `structure`
void initSampleBlock();
};
/// Parsing of external table used in the tcp client.
class ExternalTable : public BaseExternalTable
{
public:
void initReadBuffer() override;
/// Extract parameters from variables_map, which is built on the client command line
ExternalTable(const boost::program_options::variables_map & external_options);
};
/// Parsing of external table used when sending tables via http
/// The `handlePart` function will be called for each table passed,
/// so it's also necessary to call `clean` at the end of the `handlePart`.
class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable
{
public:
ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) {}
void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream);
private:
Context & context;
const Poco::Net::NameValueCollection & params;
std::unique_ptr<ReadBuffer> read_buffer_impl;
};
}