Merge branch 'master' of https://github.com/alesapin/ClickHouse into alesapin-master

This commit is contained in:
Alexey Milovidov 2018-06-16 08:29:00 +03:00
commit b108882e2e
21 changed files with 726 additions and 140 deletions

View File

@ -1,9 +1,12 @@
#include <IO/HTTPCommon.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/config.h>
#if USE_POCO_NETSSL
#include <Poco/Net/AcceptCertificateHandler.h>
#include <Poco/Net/Context.h>
#include <Poco/Net/HTTPSClientSession.h>
#include <Poco/Net/InvalidCertificateHandler.h>
#include <Poco/Net/PrivateKeyPassphraseHandler.h>
#include <Poco/Net/RejectCertificateHandler.h>
@ -12,9 +15,16 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/Application.h>
#include <sstream>
namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
}
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
{
if (!response.getKeepAlive())
@ -34,4 +44,46 @@ void SSLInit()
Poco::Net::initializeSSL();
#endif
}
std::unique_ptr<Poco::Net::HTTPClientSession> getPreparedSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
{
bool is_ssl = static_cast<bool>(uri.getScheme() == "https");
std::unique_ptr<Poco::Net::HTTPClientSession> session(
#if USE_POCO_NETSSL
is_ssl ? new Poco::Net::HTTPSClientSession :
#endif
new Poco::Net::HTTPClientSession);
session->setHost(DNSResolver::instance().resolveHost(uri.getHost()).toString());
session->setPort(uri.getPort());
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session->setTimeout(timeouts.connection_timeout);
#endif
return session;
}
std::istream * makeRequest(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response)
{
auto istr = &session.receiveResponse(response);
auto status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "
<< response.getReason() << ", body: " << istr->rdbuf();
throw Exception(error_message.str(),
status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
return istr;
}
}

View File

@ -1,6 +1,16 @@
#pragma once
#include <mutex>
#include <memory>
#include <iostream>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <IO/ConnectionTimeouts.h>
namespace Poco
{
@ -14,9 +24,18 @@ namespace Poco
namespace DB
{
const int HTTP_TOO_MANY_REQUESTS = 429;
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout);
extern std::once_flag ssl_init_once;
void SSLInit();
std::unique_ptr<Poco::Net::HTTPClientSession> getPreparedSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
/* Function makes HTTP-request from prepared structures and returns response istream
* in case of HTTP_OK and throws exception with details in case of not HTTP_OK
*/
std::istream* makeRequest(Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response);
}

View File

@ -8,19 +8,10 @@
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Version.h>
#include <common/logger_useful.h>
#if USE_POCO_NETSSL
#include <Poco/Net/HTTPSClientSession.h>
#endif
#include <IO/HTTPCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
}
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
@ -31,25 +22,8 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
: ReadBuffer(nullptr, 0),
uri{uri},
method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET},
timeouts{timeouts},
is_ssl{uri.getScheme() == "https"},
session
session{getPreparedSession(uri, timeouts)}
{
std::unique_ptr<Poco::Net::HTTPClientSession>(
#if USE_POCO_NETSSL
is_ssl ? new Poco::Net::HTTPSClientSession :
#endif
new Poco::Net::HTTPClientSession)
}
{
session->setHost(DNSResolver::instance().resolveHost(uri.getHost()).toString());
session->setPort(uri.getPort());
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session->setTimeout(timeouts.connection_timeout);
#endif
Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header
@ -66,20 +40,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
if (out_stream_callback)
out_stream_callback(stream_out);
istr = &session->receiveResponse(response);
auto status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.toString() << ". HTTP status code: " << status << " "
<< response.getReason() << ", body: " << istr->rdbuf();
throw Exception(error_message.str(),
status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
istr = makeRequest(*session, request, response);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}

View File

@ -12,9 +12,6 @@
namespace DB
{
const int HTTP_TOO_MANY_REQUESTS = 429;
/** Perform HTTP POST request and provide response to read.
*/
class ReadWriteBufferFromHTTP : public ReadBuffer
@ -22,9 +19,7 @@ class ReadWriteBufferFromHTTP : public ReadBuffer
private:
Poco::URI uri;
std::string method;
ConnectionTimeouts timeouts;
bool is_ssl;
std::unique_ptr<Poco::Net::HTTPClientSession> session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;

View File

@ -0,0 +1,25 @@
#include <IO/WriteBufferFromHTTP.h>
#include <IO/HTTPCommon.h>
#include <common/logger_useful.h>
namespace DB
{
WriteBufferFromHTTP::WriteBufferFromHTTP(
const Poco::URI & uri, const std::string & method, const ConnectionTimeouts & timeouts, size_t buffer_size_)
: WriteBufferFromOStream(buffer_size_)
, session{getPreparedSession(uri, timeouts)}
, request{method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1}
{
request.setHost(uri.getHost());
request.setChunkedTransferEncoding(true);
LOG_TRACE((&Logger::get("WriteBufferToHTTP")), "Sending request to " << uri.toString());
ostr = &session->sendRequest(request);
}
void WriteBufferFromHTTP::finalize()
{
makeRequest(*session, request, response);
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <IO/ConnectionTimeouts.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
namespace DB
{
/* Perform HTTP POST/PUT request.
*/
class WriteBufferFromHTTP : public WriteBufferFromOStream
{
private:
std::unique_ptr<Poco::Net::HTTPClientSession> session;
Poco::Net::HTTPRequest request;
Poco::Net::HTTPResponse response;
public:
explicit WriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method = Poco::Net::HTTPRequest::HTTP_POST, // POST or PUT only
const ConnectionTimeouts & timeouts = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromHTTP() override {}
// This method have to be called, to make actual request
void finalize();
};
}

View File

@ -18,28 +18,34 @@ namespace ErrorCodes
class WriteBufferFromOStream : public BufferWithOwnMemory<WriteBuffer>
{
private:
std::ostream & ostr;
protected:
std::ostream * ostr;
void nextImpl() override
{
if (!offset())
return;
ostr.write(working_buffer.begin(), offset());
ostr.flush();
ostr->write(working_buffer.begin(), offset());
ostr->flush();
if (!ostr.good())
if (!ostr->good())
throw Exception("Cannot write to ostream", ErrorCodes::CANNOT_WRITE_TO_OSTREAM);
}
WriteBufferFromOStream(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment)
{
}
public:
WriteBufferFromOStream(
std::ostream & ostr_,
size_t size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0)
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment), ostr(ostr_) {}
std::ostream & ostr_,
size_t size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0)
: BufferWithOwnMemory<WriteBuffer>(size, existing_memory, alignment), ostr(&ostr_) {}
~WriteBufferFromOStream() override
{

View File

@ -3,6 +3,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/typeid_cast.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <ext/scope_guard.h>

View File

@ -0,0 +1,172 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageURL.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromHTTP.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Poco/Net/HTTPRequest.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
};
StorageURL::StorageURL(const Poco::URI & uri_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
Context & context_)
: IStorage(columns_), uri(uri_), format_name(format_name_), table_name(table_name_), context_global(context_)
{
}
namespace
{
class StorageURLBlockInputStream : public IProfilingBlockInputStream
{
public:
StorageURLBlockInputStream(const Poco::URI & uri,
const String & format,
const String & name_,
const Block & sample_block,
const Context & context,
size_t max_block_size,
const ConnectionTimeouts & timeouts)
: name(name_)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET, nullptr, timeouts);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
String getName() const override
{
return name;
}
Block readImpl() override
{
return reader->read();
}
Block getHeader() const override
{
return reader->getHeader();
}
void readPrefixImpl() override
{
reader->readPrefix();
}
void readSuffixImpl() override
{
reader->readSuffix();
}
private:
String name;
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
BlockInputStreamPtr reader;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
{
public:
StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const Block & sample_block_,
Context & context,
const ConnectionTimeouts & timeouts)
: sample_block(sample_block_)
{
write_buf = std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
Block getHeader() const override
{
return sample_block;
}
void write(const Block & block) override
{
writer->write(block);
}
void writePrefix() override {
writer->writePrefix();
}
void writeSuffix() override {
writer->writeSuffix();
writer->flush();
write_buf->finalize();
}
private:
Block sample_block;
std::unique_ptr<WriteBufferFromHTTP> write_buf;
BlockOutputStreamPtr writer;
};
}
BlockInputStreams StorageURL::read(const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t max_block_size,
unsigned /*num_streams*/)
{
return {std::make_shared<StorageURLBlockInputStream>(
uri,
format_name,
getName(),
getSampleBlock(),
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))};
}
void StorageURL::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {}
BlockOutputStreamPtr StorageURL::write(const ASTPtr & /*query*/, const Settings & /*settings*/)
{
return std::make_shared<StorageURLBlockOutputStream>(
uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global.getSettingsRef()));
}
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 1 || engine_args.size() == 2))
throw Exception(
"Storage URL requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
String url = static_cast<const ASTLiteral &>(*engine_args[0]).value.safeGet<String>();
Poco::URI uri(url);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
String format_name = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
return StorageURL::create(uri, args.table_name, format_name, args.columns, args.context);
});
}
}

View File

@ -0,0 +1,55 @@
#pragma once
#include <Storages/IStorage.h>
#include <Poco/URI.h>
#include <common/logger_useful.h>
#include <ext/shared_ptr_helper.h>
namespace DB
{
/**
* This class represents table engine for external urls.
* It sends HTTP GET to server when select is called and
* HTTP POST when insert is called. In POST request the data is send
* using Chunked transfer encoding, so server have to support it.
*/
class StorageURL : public ext::shared_ptr_helper<StorageURL>, public IStorage
{
public:
String getName() const override
{
return "URL";
}
String getTableName() const override
{
return table_name;
}
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
protected:
StorageURL(const Poco::URI & uri_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
Context & context_);
private:
Poco::URI uri;
String format_name;
String table_name;
Context & context_global;
Logger * log = &Logger::get("StorageURL");
};
}

View File

@ -17,6 +17,7 @@ void registerStorageBuffer(StorageFactory & factory);
void registerStorageDistributed(StorageFactory & factory);
void registerStorageMemory(StorageFactory & factory);
void registerStorageFile(StorageFactory & factory);
void registerStorageURL(StorageFactory & factory);
void registerStorageDictionary(StorageFactory & factory);
void registerStorageSet(StorageFactory & factory);
void registerStorageJoin(StorageFactory & factory);
@ -50,6 +51,7 @@ void registerStorages()
registerStorageDistributed(factory);
registerStorageMemory(factory);
registerStorageFile(factory);
registerStorageURL(factory);
registerStorageDictionary(factory);
registerStorageSet(factory);
registerStorageJoin(factory);

View File

@ -0,0 +1,70 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/ITableFunctionFileLike.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageFile.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <boost/algorithm/string.hpp>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context) const
{
// Parse args
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3)
throw Exception("Table function '" + getName() + "' requires exactly 3 arguments: source, format and structure.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 3; ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string source = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
std::string format = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
std::string structure = static_cast<const ASTLiteral &>(*args[2]).value.safeGet<String>();
// Create sample block
std::vector<std::string> structure_vals;
boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on);
if (structure_vals.size() % 2 != 0)
throw Exception("Odd number of elements in section structure: must be a list of name type pairs", ErrorCodes::LOGICAL_ERROR);
Block sample_block;
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
for (size_t i = 0, size = structure_vals.size(); i < size; i += 2)
{
ColumnWithTypeAndName column;
column.name = structure_vals[i];
column.type = data_type_factory.get(structure_vals[i + 1]);
column.column = column.type->createColumn();
sample_block.insert(std::move(column));
}
// Create table
StoragePtr storage = getStorage(source, format, sample_block, const_cast<Context &>(context));
storage->startup();
return storage;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/Context.h>
#include <Core/Block.h>
namespace DB
{
/*
* function(source, format, structure) - creates a temporary storage from formated source
*/
class ITableFunctionFileLike : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const = 0;
};
}

View File

@ -1,78 +1,23 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFile.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageFile.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <boost/algorithm/string.hpp>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionFile.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int DATABASE_ACCESS_DENIED;
}
StoragePtr TableFunctionFile::executeImpl(const ASTPtr & ast_function, const Context & context) const
{
// Parse args
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3)
throw Exception("Table function '" + getName() + "' requires exactly 3 arguments: path, format and structure.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 3; ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string path = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
std::string format = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
std::string structure = static_cast<const ASTLiteral &>(*args[2]).value.safeGet<String>();
// Create sample block
std::vector<std::string> structure_vals;
boost::split(structure_vals, structure, boost::algorithm::is_any_of(" ,"), boost::algorithm::token_compress_on);
if (structure_vals.size() % 2 != 0)
throw Exception("Odd number of elements in section structure: must be a list of name type pairs", ErrorCodes::LOGICAL_ERROR);
Block sample_block;
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
for (size_t i = 0, size = structure_vals.size(); i < size; i += 2)
{
ColumnWithTypeAndName column;
column.name = structure_vals[i];
column.type = data_type_factory.get(structure_vals[i + 1]);
column.column = column.type->createColumn();
sample_block.insert(std::move(column));
}
// Create table
StoragePtr storage = StorageFile::create(
path, -1, context.getUserFilesPath(), getName(), format,
ColumnsDescription{sample_block.getNamesAndTypesList()}, const_cast<Context &>(context));
storage->startup();
return storage;
}
void registerTableFunctionFile(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionFile>();
}
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
{
return StorageFile::create(source,
-1,
global_context.getUserFilesPath(),
getName(),
format,
ColumnsDescription{sample_block.getNamesAndTypesList()},
global_context);
}
void registerTableFunctionFile(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionFile>();
}
}

View File

@ -1,25 +1,29 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/ITableFunctionFileLike.h>
#include <Interpreters/Context.h>
#include <Core/Block.h>
namespace DB
{
/* file(path, format, structure) - creates a temporary storage from file
*
*
* The file must be in the clickhouse data directory.
* The relative path begins with the clickhouse data directory.
*/
class TableFunctionFile : public ITableFunction
class TableFunctionFile : public ITableFunctionFileLike
{
public:
static constexpr auto name = "file";
std::string getName() const override
{
public:
static constexpr auto name = "file";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context) const override;
};
return name;
}
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
};
}

View File

@ -0,0 +1,19 @@
#include <Storages/StorageURL.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionURL.h>
#include <Poco/URI.h>
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const
{
Poco::URI uri(source);
return StorageURL::create(uri, getName(), format, ColumnsDescription{sample_block.getNamesAndTypesList()}, global_context);
}
void registerTableFunctionURL(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionURL>();
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <TableFunctions/ITableFunctionFileLike.h>
#include <Interpreters/Context.h>
#include <Core/Block.h>
namespace DB
{
/* url(source, format, structure) - creates a temporary storage from url
*/
class TableFunctionURL : public ITableFunctionFileLike
{
public:
static constexpr auto name = "url";
std::string getName() const override
{
return name;
}
private:
StoragePtr getStorage(
const String & source, const String & format, const Block & sample_block, Context & global_context) const override;
};
}

View File

@ -12,6 +12,7 @@ void registerTableFunctionShardByHash(TableFunctionFactory & factory);
void registerTableFunctionNumbers(TableFunctionFactory & factory);
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory);
void registerTableFunctionFile(TableFunctionFactory & factory);
void registerTableFunctionURL(TableFunctionFactory & factory);
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
void registerTableFunctionODBC(TableFunctionFactory & factory);
@ -32,6 +33,7 @@ void registerTableFunctions()
registerTableFunctionNumbers(factory);
registerTableFunctionCatBoostPool(factory);
registerTableFunctionFile(factory);
registerTableFunctionURL(factory);
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
registerTableFunctionODBC(factory);

View File

@ -0,0 +1,172 @@
#!/usr/bin/env python
from __future__ import print_function
import csv
import sys
import tempfile
import threading
import os, urllib
from io import StringIO
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
SERVER_ADDRESS = ('127.0.0.1', 51234)
SERVER_ADDRESS_STR = 'http://' + ':'.join(str(s) for s in SERVER_ADDRESS) + "/"
CSV_DATA = os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
def get_ch_answer(query):
return urllib.urlopen(os.environ.get('CLICKHOUSE_URL', 'http://localhost:' + os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')), data=query).read()
def check_answers(query, answer):
ch_answer = get_ch_answer(query)
if ch_answer.strip() != answer.strip():
print("FAIL on query:", query, file=sys.stderr)
print("Expected answer:", answer, file=sys.stderr)
print("Fetched answer :", ch_answer, file=sys.stderr)
raise Exception("Fail on query")
class CSVHTTPServer(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
def do_GET(self):
self._set_headers()
with open(CSV_DATA, 'r') as fl:
reader = csv.reader(fl, delimiter=',')
for row in reader:
self.wfile.write(', '.join(row) + '\n')
return
def read_chunk(self):
msg = ''
while True:
sym = self.rfile.read(1)
if sym == '':
break
msg += sym.decode('utf-8')
if msg.endswith('\r\n'):
break
length = int(msg[:-2], 16)
if length == 0:
return ''
content = self.rfile.read(length)
self.rfile.read(2) # read sep \r\n
return content.decode('utf-8')
def do_POST(self):
data = ''
while True:
chunk = self.read_chunk()
if not chunk:
break
data += chunk
text = ""
with StringIO(data) as fl:
reader = csv.reader(fl, delimiter=',')
with open(CSV_DATA, 'a') as d:
for row in reader:
d.write(','.join(row) + '\n')
self._set_headers()
self.wfile.write("ok")
def log_message(self, format, *args):
return
def start_server(requests_amount):
httpd = HTTPServer(SERVER_ADDRESS, CSVHTTPServer)
def real_func():
for i in xrange(requests_amount):
httpd.handle_request()
t = threading.Thread(target=real_func)
return t
# test section
def test_select(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests=[], answers=[], test_data=""):
with open(CSV_DATA, 'w') as f: # clear file
f.write('')
if test_data:
with open(CSV_DATA, 'w') as f:
f.write(test_data + "\n")
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR))
for i in xrange(len(requests)):
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
check_answers(requests[i].format(tbl=tbl), answers[i])
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
def test_insert(table_name="", schema="str String,numuint UInt32,numint Int32,double Float64", requests_insert=[], requests_select=[], answers=[]):
with open(CSV_DATA, 'w') as f: # flush test file
f.write('')
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR))
for req in requests_insert:
tbl = table_name
if not tbl:
tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
get_ch_answer(req.format(tbl=tbl))
for i in xrange(len(requests_select)):
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
check_answers(requests_select[i].format(tbl=tbl), answers[i])
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
def main():
test_data = "Hello,2,-2,7.7\nWorld,2,-5,8.8"
select_only_requests = {
"select str,numuint,numint,double from {tbl}" : test_data.replace(',', '\t'),
"select numuint, count(*) from {tbl} group by numuint" : "2\t2",
"select str,numuint,numint,double from {tbl} limit 1": test_data.split("\n")[0].replace(',', '\t'),
}
insert_requests = [
"insert into {tbl} values('Hello',10,-2,7.7)('World',10,-5,7.7)",
"insert into {tbl} select 'Buy', number, 9-number, 9.9 from system.numbers limit 10",
]
select_requests = {
"select distinct numuint from {tbl} order by numuint": '\n'.join([str(i) for i in xrange(11)]),
"select count(*) from {tbl}": '12',
'select double, count(*) from {tbl} group by double': "7.7\t2\n9.9\t10"
}
t = start_server(len(select_only_requests) * 2 + (len(insert_requests) + len(select_requests)) * 2)
t.start()
# test table with url engine
test_select(table_name="test_table_select", requests=select_only_requests.keys(), answers=select_only_requests.values(), test_data=test_data)
# test table function url
test_select(requests=select_only_requests.keys(), answers=select_only_requests.values(), test_data=test_data)
#test insert into table with url engine
test_insert(table_name="test_table_insert", requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values())
#test insert into table function url
test_insert(requests_insert=insert_requests, requests_select=select_requests.keys(), answers=select_requests.values())
t.join()
print("PASSED")
if __name__ == "__main__":
try:
main()
except:
os._exit(1)

View File

@ -0,0 +1 @@
PASSED

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python $CURDIR/00646_url_engine.python