Merge branch 'master' of github.com:ClickHouse/ClickHouse

This commit is contained in:
Sergei Shtykov 2019-12-04 09:48:47 +03:00
commit 3724b8ba30
33 changed files with 547 additions and 282 deletions

View File

@ -105,6 +105,11 @@ public:
return data->getFloat64(0); return data->getFloat64(0);
} }
Float32 getFloat32(size_t) const override
{
return data->getFloat32(0);
}
bool isNullAt(size_t) const override bool isNullAt(size_t) const override
{ {
return data->isNullAt(0); return data->isNullAt(0);

View File

@ -59,6 +59,7 @@ public:
UInt64 getUInt(size_t n) const override { return getDictionary().getUInt(getIndexes().getUInt(n)); } UInt64 getUInt(size_t n) const override { return getDictionary().getUInt(getIndexes().getUInt(n)); }
Int64 getInt(size_t n) const override { return getDictionary().getInt(getIndexes().getUInt(n)); } Int64 getInt(size_t n) const override { return getDictionary().getInt(getIndexes().getUInt(n)); }
Float64 getFloat64(size_t n) const override { return getDictionary().getInt(getIndexes().getFloat64(n)); } Float64 getFloat64(size_t n) const override { return getDictionary().getInt(getIndexes().getFloat64(n)); }
Float32 getFloat32(size_t n) const override { return getDictionary().getInt(getIndexes().getFloat32(n)); }
bool getBool(size_t n) const override { return getDictionary().getInt(getIndexes().getBool(n)); } bool getBool(size_t n) const override { return getDictionary().getInt(getIndexes().getBool(n)); }
bool isNullAt(size_t n) const override { return getDictionary().isNullAt(getIndexes().getUInt(n)); } bool isNullAt(size_t n) const override { return getDictionary().isNullAt(getIndexes().getUInt(n)); }
ColumnPtr cut(size_t start, size_t length) const override ColumnPtr cut(size_t start, size_t length) const override

View File

@ -66,6 +66,7 @@ public:
UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); } UInt64 getUInt(size_t n) const override { return getNestedColumn()->getUInt(n); }
Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); } Int64 getInt(size_t n) const override { return getNestedColumn()->getInt(n); }
Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); } Float64 getFloat64(size_t n) const override { return getNestedColumn()->getFloat64(n); }
Float32 getFloat32(size_t n) const override { return getNestedColumn()->getFloat32(n); }
bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); } bool getBool(size_t n) const override { return getNestedColumn()->getBool(n); }
bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); } bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); }
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;

View File

@ -222,6 +222,12 @@ Float64 ColumnVector<T>::getFloat64(size_t n) const
return static_cast<Float64>(data[n]); return static_cast<Float64>(data[n]);
} }
template <typename T>
Float32 ColumnVector<T>::getFloat32(size_t n) const
{
return static_cast<Float32>(data[n]);
}
template <typename T> template <typename T>
void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length) void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{ {

View File

@ -205,6 +205,7 @@ public:
UInt64 get64(size_t n) const override; UInt64 get64(size_t n) const override;
Float64 getFloat64(size_t n) const override; Float64 getFloat64(size_t n) const override;
Float32 getFloat32(size_t n) const override;
UInt64 getUInt(size_t n) const override UInt64 getUInt(size_t n) const override
{ {

View File

@ -100,6 +100,11 @@ public:
throw Exception("Method getFloat64 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method getFloat64 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
} }
virtual Float32 getFloat32(size_t /*n*/) const
{
throw Exception("Method getFloat32 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** If column is numeric, return value of n-th element, casted to UInt64. /** If column is numeric, return value of n-th element, casted to UInt64.
* For NULL values of Nullable column it is allowed to return arbitrary value. * For NULL values of Nullable column it is allowed to return arbitrary value.
* Otherwise throw an exception. * Otherwise throw an exception.

View File

@ -464,6 +464,7 @@ namespace ErrorCodes
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY = 487; extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY = 487;
extern const int UNKNOWN_DICTIONARY = 488; extern const int UNKNOWN_DICTIONARY = 488;
extern const int INCORRECT_DICTIONARY_DEFINITION = 489; extern const int INCORRECT_DICTIONARY_DEFINITION = 489;
extern const int CANNOT_FORMAT_DATETIME = 490;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -91,19 +91,7 @@ private:
template <typename T> template <typename T>
static inline void writeNumber2(char * p, T v) static inline void writeNumber2(char * p, T v)
{ {
static const char digits[201] = memcpy(p, &digits100[v * 2], 2);
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
memcpy(p, &digits[v * 2], 2);
} }
template <typename T> template <typename T>

View File

@ -7,7 +7,7 @@
#include <Functions/FunctionHelpers.h> #include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
#include <ext/range.h> #include <ext/range.h>
#include <math.h> #include <cmath>
#include <array> #include <array>
@ -21,19 +21,32 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
/** https://en.wikipedia.org/wiki/Great-circle_distance
*
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance .
* Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180].
* Original code of this implementation of this function is here https://github.com/sphinxsearch/sphinx/blob/409f2c2b5b2ff70b04e38f92b6b1a890326bad65/src/sphinxexpr.cpp#L3825.
* Andrey Aksenov, the author of original code, permitted to use this code in ClickHouse under the Apache 2.0 license.
* Presentation about this code from Highload++ Siberia 2019 is here https://github.com/ClickHouse/ClickHouse/files/3324740/1_._._GEODIST_._.pdf
* The main idea of this implementation is optimisations based on Taylor series, trigonometric identity and calculated constants once for cosine, arcsine(sqrt) and look up table.
*/
namespace namespace
{ {
const double PI = 3.14159265358979323846;
const float TO_RADF = static_cast<float>(PI / 180.0);
const float TO_RADF2 = static_cast<float>(PI / 360.0);
const int GEODIST_TABLE_COS = 1024; // maxerr 0.00063% constexpr double PI = 3.14159265358979323846;
const int GEODIST_TABLE_ASIN = 512; constexpr float TO_RADF = static_cast<float>(PI / 180.0);
const int GEODIST_TABLE_K = 1024; constexpr float TO_RADF2 = static_cast<float>(PI / 360.0);
constexpr size_t GEODIST_TABLE_COS = 1024; // maxerr 0.00063%
constexpr size_t GEODIST_TABLE_ASIN = 512;
constexpr size_t GEODIST_TABLE_K = 1024;
float g_GeoCos[GEODIST_TABLE_COS + 1]; /// cos(x) table float g_GeoCos[GEODIST_TABLE_COS + 1]; /// cos(x) table
float g_GeoAsin[GEODIST_TABLE_ASIN + 1]; /// asin(sqrt(x)) table float g_GeoAsin[GEODIST_TABLE_ASIN + 1]; /// asin(sqrt(x)) table
float g_GeoFlatK[GEODIST_TABLE_K + 1][2]; /// geodistAdaptive() flat ellipsoid method k1,k2 coeffs table float g_GeoFlatK[GEODIST_TABLE_K + 1][2]; /// geodistAdaptive() flat ellipsoid method k1, k2 coeffs table
inline double sqr(double v) inline double sqr(double v)
{ {
@ -48,7 +61,7 @@ inline float fsqr(float v)
void geodistInit() void geodistInit()
{ {
for (size_t i = 0; i <= GEODIST_TABLE_COS; ++i) for (size_t i = 0; i <= GEODIST_TABLE_COS; ++i)
g_GeoCos[i] = static_cast<float>(cos(2 * PI * i / GEODIST_TABLE_COS)); // [0, 2pi] -> [0, COSTABLE] g_GeoCos[i] = static_cast<float>(cos(2 * PI * i / GEODIST_TABLE_COS)); // [0, 2 * pi] -> [0, COSTABLE]
for (size_t i = 0; i <= GEODIST_TABLE_ASIN; ++i) for (size_t i = 0; i <= GEODIST_TABLE_ASIN; ++i)
g_GeoAsin[i] = static_cast<float>(asin( g_GeoAsin[i] = static_cast<float>(asin(
@ -56,7 +69,7 @@ void geodistInit()
for (size_t i = 0; i <= GEODIST_TABLE_K; ++i) for (size_t i = 0; i <= GEODIST_TABLE_K; ++i)
{ {
double x = PI * i / GEODIST_TABLE_K - PI * 0.5; // [-pi/2, pi/2] -> [0, KTABLE] double x = PI * i / GEODIST_TABLE_K - PI * 0.5; // [-pi / 2, pi / 2] -> [0, KTABLE]
g_GeoFlatK[i][0] = static_cast<float>(sqr(111132.09 - 566.05 * cos(2 * x) + 1.20 * cos(4 * x))); g_GeoFlatK[i][0] = static_cast<float>(sqr(111132.09 - 566.05 * cos(2 * x) + 1.20 * cos(4 * x)));
g_GeoFlatK[i][1] = static_cast<float>(sqr(111415.13 * cos(x) - 94.55 * cos(3 * x) + 0.12 * cos(5 * x))); g_GeoFlatK[i][1] = static_cast<float>(sqr(111415.13 * cos(x) - 94.55 * cos(3 * x) + 0.12 * cos(5 * x)));
} }
@ -86,11 +99,10 @@ inline float geodistFastSin(float x)
float y = static_cast<float>(fabs(x) * GEODIST_TABLE_COS / PI / 2); float y = static_cast<float>(fabs(x) * GEODIST_TABLE_COS / PI / 2);
int i = static_cast<int>(y); int i = static_cast<int>(y);
y -= i; y -= i;
i = (i - GEODIST_TABLE_COS / 4) & (GEODIST_TABLE_COS - 1); // cos(x-pi/2)=sin(x), costable/4=pi/2 i = (i - GEODIST_TABLE_COS / 4) & (GEODIST_TABLE_COS - 1); // cos(x - pi / 2) = sin(x), costable / 4 = pi / 2
return g_GeoCos[i] + (g_GeoCos[i + 1] - g_GeoCos[i]) * y; return g_GeoCos[i] + (g_GeoCos[i + 1] - g_GeoCos[i]) * y;
} }
/// fast implementation of asin(sqrt(x)) /// fast implementation of asin(sqrt(x))
/// max error in floats 0.00369%, in doubles 0.00072% /// max error in floats 0.00369%, in doubles 0.00072%
inline float geodistFastAsinSqrt(float x) inline float geodistFastAsinSqrt(float x)
@ -110,17 +122,10 @@ inline float geodistFastAsinSqrt(float x)
} }
return static_cast<float>(asin(sqrt(x))); // distance over 17083km, just compute honestly return static_cast<float>(asin(sqrt(x))); // distance over 17083km, just compute honestly
} }
} }
/**
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance .
* Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180].
* Original code of this implementation of this function is here https://github.com/sphinxsearch/sphinx/blob/409f2c2b5b2ff70b04e38f92b6b1a890326bad65/src/sphinxexpr.cpp#L3825.
* Andrey Aksenov, the author of original code, permitted to use this code in ClickHouse under the Apache 2.0 license.
* Presentation about this code from Highload++ Siberia 2019 is here https://github.com/ClickHouse/ClickHouse/files/3324740/1_._._GEODIST_._.pdf
* The main idea of this implementation is optimisations based on Taylor series, trigonometric identity and calculated constants once for cosine, arcsine(sqrt) and look up table.
*/
class FunctionGreatCircleDistance : public IFunction class FunctionGreatCircleDistance : public IFunction
{ {
public: public:
@ -128,134 +133,78 @@ public:
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); } static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); }
private: private:
enum class instr_type : uint8_t
{
get_float_64,
get_const_float_64
};
using instr_t = std::pair<instr_type, const IColumn *>;
using instrs_t = std::array<instr_t, 4>;
String getName() const override { return name; } String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 4; } size_t getNumberOfArguments() const override { return 4; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{ {
for (const auto arg_idx : ext::range(0, arguments.size())) for (const auto arg_idx : ext::range(0, arguments.size()))
{ {
const auto arg = arguments[arg_idx].get(); const auto arg = arguments[arg_idx].get();
if (!WhichDataType(arg).isFloat64()) if (!WhichDataType(arg).isFloat())
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName() + ". Must be Float64", "Illegal type " + arg->getName() + " of argument " + std::to_string(arg_idx + 1) + " of function " + getName() + ". Must be Float64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
return std::make_shared<DataTypeFloat64>(); return std::make_shared<DataTypeFloat32>();
} }
instrs_t getInstructions(const Block & block, const ColumnNumbers & arguments, bool & out_const) Float32 greatCircleDistance(Float32 lon1deg, Float32 lat1deg, Float32 lon2deg, Float32 lat2deg)
{ {
instrs_t result; if (lon1deg < -180 || lon1deg > 180 ||
out_const = true; lon2deg < -180 || lon2deg > 180 ||
lat1deg < -90 || lat1deg > 90 ||
for (const auto arg_idx : ext::range(0, arguments.size())) lat2deg < -90 || lat2deg > 90)
{
const auto column = block.getByPosition(arguments[arg_idx]).column.get();
if (const auto col = checkAndGetColumn<ColumnVector<Float64>>(column))
{
out_const = false;
result[arg_idx] = instr_t{instr_type::get_float_64, col};
}
else if (const auto col_const = checkAndGetColumnConst<ColumnVector<Float64>>(column))
{
result[arg_idx] = instr_t{instr_type::get_const_float_64, col_const};
}
else
throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
return result;
}
/// https://en.wikipedia.org/wiki/Great-circle_distance
Float64 greatCircleDistance(Float64 lon1Deg, Float64 lat1Deg, Float64 lon2Deg, Float64 lat2Deg)
{
if (lon1Deg < -180 || lon1Deg > 180 ||
lon2Deg < -180 || lon2Deg > 180 ||
lat1Deg < -90 || lat1Deg > 90 ||
lat2Deg < -90 || lat2Deg > 90)
{ {
throw Exception("Arguments values out of bounds for function " + getName(), throw Exception("Arguments values out of bounds for function " + getName(),
ErrorCodes::ARGUMENT_OUT_OF_BOUND); ErrorCodes::ARGUMENT_OUT_OF_BOUND);
} }
float dlat = geodistDegDiff(lat1Deg - lat2Deg); float lat_diff = geodistDegDiff(lat1deg - lat2deg);
float dlon = geodistDegDiff(lon1Deg - lon2Deg); float lon_diff = geodistDegDiff(lon1deg - lon2deg);
if (dlon < 13) if (lon_diff < 13)
{ {
// points are close enough; use flat ellipsoid model // points are close enough; use flat ellipsoid model
// interpolate sqr(k1), sqr(k2) coefficients using latitudes midpoint // interpolate sqr(k1), sqr(k2) coefficients using latitudes midpoint
float m = (lat1Deg + lat2Deg + 180) * GEODIST_TABLE_K / 360; // [-90, 90] degrees -> [0, KTABLE] indexes float m = (lat1deg + lat2deg + 180) * GEODIST_TABLE_K / 360; // [-90, 90] degrees -> [0, KTABLE] indexes
int i = static_cast<int>(m); size_t i = static_cast<size_t>(m) & (GEODIST_TABLE_K - 1);
i &= (GEODIST_TABLE_K - 1);
float kk1 = g_GeoFlatK[i][0] + (g_GeoFlatK[i + 1][0] - g_GeoFlatK[i][0]) * (m - i); float kk1 = g_GeoFlatK[i][0] + (g_GeoFlatK[i + 1][0] - g_GeoFlatK[i][0]) * (m - i);
float kk2 = g_GeoFlatK[i][1] + (g_GeoFlatK[i + 1][1] - g_GeoFlatK[i][1]) * (m - i); float kk2 = g_GeoFlatK[i][1] + (g_GeoFlatK[i + 1][1] - g_GeoFlatK[i][1]) * (m - i);
return static_cast<float>(sqrt(kk1 * dlat * dlat + kk2 * dlon * dlon)); return static_cast<float>(sqrt(kk1 * lat_diff * lat_diff + kk2 * lon_diff * lon_diff));
}
// points too far away; use haversine
static const float D = 2 * 6371000;
float a = fsqr(geodistFastSin(dlat * TO_RADF2)) +
geodistFastCos(lat1Deg * TO_RADF) * geodistFastCos(lat2Deg * TO_RADF) *
fsqr(geodistFastSin(dlon * TO_RADF2));
return static_cast<float>(D * geodistFastAsinSqrt(a));
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const auto size = input_rows_count;
bool result_is_const{};
auto instrs = getInstructions(block, arguments, result_is_const);
if (result_is_const)
{
const auto & colLon1 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[0]).column.get())->getValue<Float64>();
const auto & colLat1 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[1]).column.get())->getValue<Float64>();
const auto & colLon2 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[2]).column.get())->getValue<Float64>();
const auto & colLat2 = assert_cast<const ColumnConst *>(block.getByPosition(arguments[3]).column.get())->getValue<Float64>();
Float64 res = greatCircleDistance(colLon1, colLat1, colLon2, colLat2);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(size, res);
} }
else else
{ {
auto dst = ColumnVector<Float64>::create(); // points too far away; use haversine
auto & dst_data = dst->getData(); static const float d = 2 * 6371000;
dst_data.resize(size); float a = fsqr(geodistFastSin(lat_diff * TO_RADF2)) +
Float64 vals[instrs.size()]; geodistFastCos(lat1deg * TO_RADF) * geodistFastCos(lat2deg * TO_RADF) *
for (const auto row : ext::range(0, size)) fsqr(geodistFastSin(lon_diff * TO_RADF2));
{ return static_cast<float>(d * geodistFastAsinSqrt(a));
for (const auto idx : ext::range(0, instrs.size()))
{
if (instr_type::get_float_64 == instrs[idx].first)
vals[idx] = assert_cast<const ColumnVector<Float64> *>(instrs[idx].second)->getData()[row];
else if (instr_type::get_const_float_64 == instrs[idx].first)
vals[idx] = assert_cast<const ColumnConst *>(instrs[idx].second)->getValue<Float64>();
else
throw Exception{"Unknown instruction type in implementation of greatCircleDistance function", ErrorCodes::LOGICAL_ERROR};
}
dst_data[row] = greatCircleDistance(vals[0], vals[1], vals[2], vals[3]);
}
block.getByPosition(result).column = std::move(dst);
} }
} }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
auto dst = ColumnVector<Float32>::create();
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
const IColumn & col_lon1 = *block.getByPosition(arguments[0]).column;
const IColumn & col_lat1 = *block.getByPosition(arguments[1]).column;
const IColumn & col_lon2 = *block.getByPosition(arguments[2]).column;
const IColumn & col_lat2 = *block.getByPosition(arguments[3]).column;
for (size_t row_num = 0; row_num < input_rows_count; ++row_num)
dst_data[row_num] = greatCircleDistance(
col_lon1.getFloat32(row_num), col_lat1.getFloat32(row_num),
col_lon2.getFloat32(row_num), col_lat2.getFloat32(row_num));
block.getByPosition(result).column = std::move(dst);
}
}; };

View File

@ -1,6 +1,7 @@
#include <IO/ReadBufferFromS3.h> #include <IO/ReadBufferFromS3.h>
#include <IO/ReadBufferFromIStream.h> #include <IO/ReadBufferFromIStream.h>
#include <IO/S3Common.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
@ -10,13 +11,12 @@ namespace DB
const int DEFAULT_S3_MAX_FOLLOW_GET_REDIRECT = 2; const int DEFAULT_S3_MAX_FOLLOW_GET_REDIRECT = 2;
ReadBufferFromS3::ReadBufferFromS3(Poco::URI uri_, ReadBufferFromS3::ReadBufferFromS3(const Poco::URI & uri_,
const ConnectionTimeouts & timeouts, const String & access_key_id_,
const Poco::Net::HTTPBasicCredentials & credentials, const String & secret_access_key_,
size_t buffer_size_) const ConnectionTimeouts & timeouts)
: ReadBuffer(nullptr, 0) : ReadBuffer(nullptr, 0)
, uri {uri_} , uri {uri_}
, method {Poco::Net::HTTPRequest::HTTP_GET}
, session {makeHTTPSession(uri_, timeouts)} , session {makeHTTPSession(uri_, timeouts)}
{ {
Poco::Net::HTTPResponse response; Poco::Net::HTTPResponse response;
@ -28,11 +28,13 @@ ReadBufferFromS3::ReadBufferFromS3(Poco::URI uri_,
if (uri.getPath().empty()) if (uri.getPath().empty())
uri.setPath("/"); uri.setPath("/");
request = std::make_unique<Poco::Net::HTTPRequest>(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request = std::make_unique<Poco::Net::HTTPRequest>(
Poco::Net::HTTPRequest::HTTP_GET,
uri.getPathAndQuery(),
Poco::Net::HTTPRequest::HTTP_1_1);
request->setHost(uri.getHost()); // use original, not resolved host name in header request->setHost(uri.getHost()); // use original, not resolved host name in header
if (!credentials.getUsername().empty()) S3Helper::authenticateRequest(*request, access_key_id_, secret_access_key_);
credentials.authenticate(*request);
LOG_TRACE((&Logger::get("ReadBufferFromS3")), "Sending request to " << uri.toString()); LOG_TRACE((&Logger::get("ReadBufferFromS3")), "Sending request to " << uri.toString());
@ -54,7 +56,7 @@ ReadBufferFromS3::ReadBufferFromS3(Poco::URI uri_,
} }
assertResponseIsOk(*request, response, *istr); assertResponseIsOk(*request, response, *istr);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_); impl = std::make_unique<ReadBufferFromIStream>(*istr, DBMS_DEFAULT_BUFFER_SIZE);
} }

View File

@ -17,17 +17,15 @@ class ReadBufferFromS3 : public ReadBuffer
{ {
protected: protected:
Poco::URI uri; Poco::URI uri;
std::string method;
HTTPSessionPtr session; HTTPSessionPtr session;
std::istream * istr; /// owned by session std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl; std::unique_ptr<ReadBuffer> impl;
public: public:
explicit ReadBufferFromS3(Poco::URI uri_, explicit ReadBufferFromS3(const Poco::URI & uri_,
const ConnectionTimeouts & timeouts = {}, const String & access_key_id_,
const Poco::Net::HTTPBasicCredentials & credentials = {}, const String & secret_access_key_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE); const ConnectionTimeouts & timeouts = {});
bool nextImpl() override; bool nextImpl() override;
}; };

60
dbms/src/IO/S3Common.cpp Normal file
View File

@ -0,0 +1,60 @@
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <iterator>
#include <sstream>
#include <Poco/Base64Encoder.h>
#include <Poco/HMACEngine.h>
#include <Poco/SHA1Engine.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_FORMAT_DATETIME;
}
void S3Helper::authenticateRequest(
Poco::Net::HTTPRequest & request,
const String & access_key_id,
const String & secret_access_key)
{
/// See https://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
if (access_key_id.empty())
return;
/// Limitations:
/// 1. Virtual hosted-style requests are not supported (e.g. `http://johnsmith.net.s3.amazonaws.com/homepage.html`).
/// 2. AMZ headers are not supported (TODO).
if (!request.has("Date"))
{
WriteBufferFromOwnString out;
writeDateTimeTextRFC1123(time(nullptr), out, DateLUT::instance("UTC"));
request.set("Date", out.str());
}
String string_to_sign = request.getMethod() + "\n"
+ request.get("Content-MD5", "") + "\n"
+ request.get("Content-Type", "") + "\n"
+ request.get("Date") + "\n"
+ Poco::URI(request.getURI()).getPathAndQuery();
Poco::HMACEngine<Poco::SHA1Engine> engine(secret_access_key);
engine.update(string_to_sign);
auto digest = engine.digest();
std::ostringstream signature;
Poco::Base64Encoder encoder(signature);
std::copy(digest.begin(), digest.end(), std::ostream_iterator<char>(encoder));
encoder.close();
request.set("Authorization", "AWS " + access_key_id + ":" + signature.str());
}
}

19
dbms/src/IO/S3Common.h Normal file
View File

@ -0,0 +1,19 @@
#pragma once
#include <Core/Types.h>
#include <Poco/Net/HTTPRequest.h>
namespace DB
{
namespace S3Helper
{
void authenticateRequest(
Poco::Net::HTTPRequest & request,
const String & access_key_id,
const String & secret_access_key);
};
}

View File

@ -1,5 +1,6 @@
#include <IO/WriteBufferFromS3.h> #include <IO/WriteBufferFromS3.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Poco/DOM/AutoPtr.h> #include <Poco/DOM/AutoPtr.h>
@ -30,22 +31,22 @@ namespace ErrorCodes
WriteBufferFromS3::WriteBufferFromS3( WriteBufferFromS3::WriteBufferFromS3(
const Poco::URI & uri_, const Poco::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
size_t minimum_upload_part_size_, size_t minimum_upload_part_size_,
const ConnectionTimeouts & timeouts_, const ConnectionTimeouts & timeouts_)
const Poco::Net::HTTPBasicCredentials & credentials, size_t buffer_size_ : BufferWithOwnMemory<WriteBuffer>(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, uri {uri_} , uri {uri_}
, access_key_id {access_key_id_}
, secret_access_key {secret_access_key_}
, minimum_upload_part_size {minimum_upload_part_size_} , minimum_upload_part_size {minimum_upload_part_size_}
, timeouts {timeouts_} , timeouts {timeouts_}
, auth_request {Poco::Net::HTTPRequest::HTTP_PUT, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1}
, temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)} , temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)}
, last_part_size {0} , last_part_size {0}
{ {
if (!credentials.getUsername().empty())
credentials.authenticate(auth_request);
initiate(); initiate();
/// FIXME: Implement rest of S3 authorization.
} }
@ -113,11 +114,7 @@ void WriteBufferFromS3::initiate()
request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_POST, initiate_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_POST, initiate_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request_ptr->setHost(initiate_uri.getHost()); // use original, not resolved host name in header request_ptr->setHost(initiate_uri.getHost()); // use original, not resolved host name in header
if (auth_request.hasCredentials()) S3Helper::authenticateRequest(*request_ptr, access_key_id, secret_access_key);
{
Poco::Net::HTTPBasicCredentials credentials(auth_request);
credentials.authenticate(*request_ptr);
}
request_ptr->setContentLength(0); request_ptr->setContentLength(0);
@ -179,11 +176,7 @@ void WriteBufferFromS3::writePart(const String & data)
request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_PUT, part_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_PUT, part_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request_ptr->setHost(part_uri.getHost()); // use original, not resolved host name in header request_ptr->setHost(part_uri.getHost()); // use original, not resolved host name in header
if (auth_request.hasCredentials()) S3Helper::authenticateRequest(*request_ptr, access_key_id, secret_access_key);
{
Poco::Net::HTTPBasicCredentials credentials(auth_request);
credentials.authenticate(*request_ptr);
}
request_ptr->setExpectContinue(true); request_ptr->setExpectContinue(true);
@ -252,11 +245,7 @@ void WriteBufferFromS3::complete()
request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_POST, complete_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request_ptr = std::make_unique<Poco::Net::HTTPRequest>(Poco::Net::HTTPRequest::HTTP_POST, complete_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request_ptr->setHost(complete_uri.getHost()); // use original, not resolved host name in header request_ptr->setHost(complete_uri.getHost()); // use original, not resolved host name in header
if (auth_request.hasCredentials()) S3Helper::authenticateRequest(*request_ptr, access_key_id, secret_access_key);
{
Poco::Net::HTTPBasicCredentials credentials(auth_request);
credentials.authenticate(*request_ptr);
}
request_ptr->setExpectContinue(true); request_ptr->setExpectContinue(true);

View File

@ -21,9 +21,10 @@ class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
{ {
private: private:
Poco::URI uri; Poco::URI uri;
String access_key_id;
String secret_access_key;
size_t minimum_upload_part_size; size_t minimum_upload_part_size;
ConnectionTimeouts timeouts; ConnectionTimeouts timeouts;
Poco::Net::HTTPRequest auth_request;
String buffer_string; String buffer_string;
std::unique_ptr<WriteBufferFromString> temporary_buffer; std::unique_ptr<WriteBufferFromString> temporary_buffer;
size_t last_part_size; size_t last_part_size;
@ -35,10 +36,10 @@ private:
public: public:
explicit WriteBufferFromS3(const Poco::URI & uri, explicit WriteBufferFromS3(const Poco::URI & uri,
const String & access_key_id,
const String & secret_access_key,
size_t minimum_upload_part_size_, size_t minimum_upload_part_size_,
const ConnectionTimeouts & timeouts = {}, const ConnectionTimeouts & timeouts = {});
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
void nextImpl() override; void nextImpl() override;

View File

@ -568,45 +568,46 @@ inline void writeUUIDText(const UUID & uuid, WriteBuffer & buf)
buf.write(s, sizeof(s)); buf.write(s, sizeof(s));
} }
static const char digits100[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
/// in YYYY-MM-DD format /// in YYYY-MM-DD format
template <char delimiter = '-'> template <char delimiter = '-'>
inline void writeDateText(const LocalDate & date, WriteBuffer & buf) inline void writeDateText(const LocalDate & date, WriteBuffer & buf)
{ {
static const char digits[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
if (buf.position() + 10 <= buf.buffer().end()) if (buf.position() + 10 <= buf.buffer().end())
{ {
memcpy(buf.position(), &digits[date.year() / 100 * 2], 2); memcpy(buf.position(), &digits100[date.year() / 100 * 2], 2);
buf.position() += 2; buf.position() += 2;
memcpy(buf.position(), &digits[date.year() % 100 * 2], 2); memcpy(buf.position(), &digits100[date.year() % 100 * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = delimiter; *buf.position() = delimiter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[date.month() * 2], 2); memcpy(buf.position(), &digits100[date.month() * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = delimiter; *buf.position() = delimiter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[date.day() * 2], 2); memcpy(buf.position(), &digits100[date.day() * 2], 2);
buf.position() += 2; buf.position() += 2;
} }
else else
{ {
buf.write(&digits[date.year() / 100 * 2], 2); buf.write(&digits100[date.year() / 100 * 2], 2);
buf.write(&digits[date.year() % 100 * 2], 2); buf.write(&digits100[date.year() % 100 * 2], 2);
buf.write(delimiter); buf.write(delimiter);
buf.write(&digits[date.month() * 2], 2); buf.write(&digits100[date.month() * 2], 2);
buf.write(delimiter); buf.write(delimiter);
buf.write(&digits[date.day() * 2], 2); buf.write(&digits100[date.day() * 2], 2);
} }
} }
@ -628,59 +629,47 @@ inline void writeDateText(DayNum date, WriteBuffer & buf)
template <char date_delimeter = '-', char time_delimeter = ':', char between_date_time_delimiter = ' '> template <char date_delimeter = '-', char time_delimeter = ':', char between_date_time_delimiter = ' '>
inline void writeDateTimeText(const LocalDateTime & datetime, WriteBuffer & buf) inline void writeDateTimeText(const LocalDateTime & datetime, WriteBuffer & buf)
{ {
static const char digits[201] =
"00010203040506070809"
"10111213141516171819"
"20212223242526272829"
"30313233343536373839"
"40414243444546474849"
"50515253545556575859"
"60616263646566676869"
"70717273747576777879"
"80818283848586878889"
"90919293949596979899";
if (buf.position() + 19 <= buf.buffer().end()) if (buf.position() + 19 <= buf.buffer().end())
{ {
memcpy(buf.position(), &digits[datetime.year() / 100 * 2], 2); memcpy(buf.position(), &digits100[datetime.year() / 100 * 2], 2);
buf.position() += 2; buf.position() += 2;
memcpy(buf.position(), &digits[datetime.year() % 100 * 2], 2); memcpy(buf.position(), &digits100[datetime.year() % 100 * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = date_delimeter; *buf.position() = date_delimeter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[datetime.month() * 2], 2); memcpy(buf.position(), &digits100[datetime.month() * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = date_delimeter; *buf.position() = date_delimeter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[datetime.day() * 2], 2); memcpy(buf.position(), &digits100[datetime.day() * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = between_date_time_delimiter; *buf.position() = between_date_time_delimiter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[datetime.hour() * 2], 2); memcpy(buf.position(), &digits100[datetime.hour() * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = time_delimeter; *buf.position() = time_delimeter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[datetime.minute() * 2], 2); memcpy(buf.position(), &digits100[datetime.minute() * 2], 2);
buf.position() += 2; buf.position() += 2;
*buf.position() = time_delimeter; *buf.position() = time_delimeter;
++buf.position(); ++buf.position();
memcpy(buf.position(), &digits[datetime.second() * 2], 2); memcpy(buf.position(), &digits100[datetime.second() * 2], 2);
buf.position() += 2; buf.position() += 2;
} }
else else
{ {
buf.write(&digits[datetime.year() / 100 * 2], 2); buf.write(&digits100[datetime.year() / 100 * 2], 2);
buf.write(&digits[datetime.year() % 100 * 2], 2); buf.write(&digits100[datetime.year() % 100 * 2], 2);
buf.write(date_delimeter); buf.write(date_delimeter);
buf.write(&digits[datetime.month() * 2], 2); buf.write(&digits100[datetime.month() * 2], 2);
buf.write(date_delimeter); buf.write(date_delimeter);
buf.write(&digits[datetime.day() * 2], 2); buf.write(&digits100[datetime.day() * 2], 2);
buf.write(between_date_time_delimiter); buf.write(between_date_time_delimiter);
buf.write(&digits[datetime.hour() * 2], 2); buf.write(&digits100[datetime.hour() * 2], 2);
buf.write(time_delimeter); buf.write(time_delimeter);
buf.write(&digits[datetime.minute() * 2], 2); buf.write(&digits100[datetime.minute() * 2], 2);
buf.write(time_delimeter); buf.write(time_delimeter);
buf.write(&digits[datetime.second() * 2], 2); buf.write(&digits100[datetime.second() * 2], 2);
} }
} }
@ -707,6 +696,33 @@ inline void writeDateTimeText(time_t datetime, WriteBuffer & buf, const DateLUTI
} }
/// In the RFC 1123 format: "Tue, 03 Dec 2019 00:11:50 GMT". You must provide GMT DateLUT.
/// This is needed for HTTP requests.
inline void writeDateTimeTextRFC1123(time_t datetime, WriteBuffer & buf, const DateLUTImpl & date_lut)
{
const auto & values = date_lut.getValues(datetime);
static const char week_days[3 * 8 + 1] = "XXX" "Mon" "Tue" "Wed" "Thu" "Fri" "Sat" "Sun";
static const char months[3 * 13 + 1] = "XXX" "Jan" "Feb" "Mar" "Apr" "May" "Jun" "Jul" "Aug" "Sep" "Oct" "Nov" "Dec";
buf.write(&week_days[values.day_of_week * 3], 3);
buf.write(", ", 2);
buf.write(&digits100[values.day_of_month * 2], 2);
buf.write(' ');
buf.write(&months[values.month * 3], 3);
buf.write(' ');
buf.write(&digits100[values.year / 100 * 2], 2);
buf.write(&digits100[values.year % 100 * 2], 2);
buf.write(' ');
buf.write(&digits100[date_lut.toHour(datetime) * 2], 2);
buf.write(':');
buf.write(&digits100[date_lut.toMinute(datetime) * 2], 2);
buf.write(':');
buf.write(&digits100[date_lut.toSecond(datetime) * 2], 2);
buf.write(" GMT", 4);
}
/// Methods for output in binary format. /// Methods for output in binary format.
template <typename T> template <typename T>
inline std::enable_if_t<is_arithmetic_v<T>, void> inline std::enable_if_t<is_arithmetic_v<T>, void>

View File

@ -0,0 +1,14 @@
#include <gtest/gtest.h>
#include <common/DateLUT.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
TEST(RFC1123, Test)
{
using namespace DB;
WriteBufferFromOwnString out;
writeDateTimeTextRFC1123(1111111111, out, DateLUT::instance("UTC"));
ASSERT_EQ(out.str(), "Fri, 18 Mar 2005 01:58:31 GMT");
}

View File

@ -167,7 +167,7 @@ private:
size_t canMoveEqualsToJoinOn(const ASTFunction & node) size_t canMoveEqualsToJoinOn(const ASTFunction & node)
{ {
if (!node.arguments) if (!node.arguments)
throw Exception("Logical error: function requires argiment", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: function requires arguments", ErrorCodes::LOGICAL_ERROR);
if (node.arguments->children.size() != 2) if (node.arguments->children.size() != 2)
return false; return false;

View File

@ -27,7 +27,7 @@ struct ExternalLoaderConfigSettings
}; };
/** Iterface for manage user-defined objects. /** Interface for manage user-defined objects.
* Monitors configuration file and automatically reloads objects in separate threads. * Monitors configuration file and automatically reloads objects in separate threads.
* The monitoring thread wakes up every 'check_period_sec' seconds and checks * The monitoring thread wakes up every 'check_period_sec' seconds and checks
* modification time of objects' configuration file. If said time is greater than * modification time of objects' configuration file. If said time is greater than

View File

@ -28,7 +28,7 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F
if (it != list_of_selects->children.begin()) if (it != list_of_selects->children.begin())
settings.ostr settings.ostr
<< settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
<< "UNION ALL" << (settings.hilite ? hilite_keyword : "") << "UNION ALL" << (settings.hilite ? hilite_none : "")
<< settings.nl_or_ws; << settings.nl_or_ws;
(*it)->formatImpl(settings, state, frame); (*it)->formatImpl(settings, state, frame);

View File

@ -35,11 +35,13 @@ static Columns unmuteColumns(MutableColumns && mut_columns)
Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_) Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_)
: columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_) : columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_)
{ {
checkNumRowsIsConsistent();
} }
Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_) Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_)
: columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_), chunk_info(std::move(chunk_info_)) : columns(unmuteColumns(std::move(columns_))), num_rows(num_rows_), chunk_info(std::move(chunk_info_))
{ {
checkNumRowsIsConsistent();
} }
Chunk Chunk::clone() const Chunk Chunk::clone() const

View File

@ -43,7 +43,6 @@ Chunk IRowInputFormat::generate()
size_t num_columns = header.columns(); size_t num_columns = header.columns();
MutableColumns columns = header.cloneEmptyColumns(); MutableColumns columns = header.cloneEmptyColumns();
size_t prev_rows = total_rows;
///auto chunk_missing_values = std::make_unique<ChunkMissingValues>(); ///auto chunk_missing_values = std::make_unique<ChunkMissingValues>();
block_missing_values.clear(); block_missing_values.clear();
@ -149,7 +148,8 @@ Chunk IRowInputFormat::generate()
return {}; return {};
} }
Chunk chunk(std::move(columns), total_rows - prev_rows); auto num_rows = columns.front()->size();
Chunk chunk(std::move(columns), num_rows);
//chunk.setChunkInfo(std::move(chunk_missing_values)); //chunk.setChunkInfo(std::move(chunk_missing_values));
return chunk; return chunk;
} }

View File

@ -32,6 +32,8 @@ namespace
{ {
public: public:
StorageS3BlockInputStream(const Poco::URI & uri, StorageS3BlockInputStream(const Poco::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & format, const String & format,
const String & name_, const String & name_,
const Block & sample_block, const Block & sample_block,
@ -41,7 +43,7 @@ namespace
const CompressionMethod compression_method) const CompressionMethod compression_method)
: name(name_) : name(name_)
{ {
read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, uri, timeouts); read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, uri, access_key_id, secret_access_key, timeouts);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
} }
@ -80,6 +82,8 @@ namespace
{ {
public: public:
StorageS3BlockOutputStream(const Poco::URI & uri, StorageS3BlockOutputStream(const Poco::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & format, const String & format,
UInt64 min_upload_part_size, UInt64 min_upload_part_size,
const Block & sample_block_, const Block & sample_block_,
@ -88,7 +92,13 @@ namespace
const CompressionMethod compression_method) const CompressionMethod compression_method)
: sample_block(sample_block_) : sample_block(sample_block_)
{ {
write_buf = getWriteBuffer<WriteBufferFromS3>(compression_method, uri, min_upload_part_size, timeouts); write_buf = getWriteBuffer<WriteBufferFromS3>(
compression_method,
uri,
access_key_id,
secret_access_key,
min_upload_part_size,
timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
} }
@ -124,6 +134,8 @@ namespace
StorageS3::StorageS3( StorageS3::StorageS3(
const Poco::URI & uri_, const Poco::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
const std::string & database_name_, const std::string & database_name_,
const std::string & table_name_, const std::string & table_name_,
const String & format_name_, const String & format_name_,
@ -134,6 +146,8 @@ StorageS3::StorageS3(
const String & compression_method_ = "") const String & compression_method_ = "")
: IStorage(columns_) : IStorage(columns_)
, uri(uri_) , uri(uri_)
, access_key_id(access_key_id_)
, secret_access_key(secret_access_key_)
, context_global(context_) , context_global(context_)
, format_name(format_name_) , format_name(format_name_)
, database_name(database_name_) , database_name(database_name_)
@ -156,6 +170,8 @@ BlockInputStreams StorageS3::read(
{ {
BlockInputStreamPtr block_input = std::make_shared<StorageS3BlockInputStream>( BlockInputStreamPtr block_input = std::make_shared<StorageS3BlockInputStream>(
uri, uri,
access_key_id,
secret_access_key,
format_name, format_name,
getName(), getName(),
getHeaderBlock(column_names), getHeaderBlock(column_names),
@ -179,7 +195,13 @@ void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_dat
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/) BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/)
{ {
return std::make_shared<StorageS3BlockOutputStream>( return std::make_shared<StorageS3BlockOutputStream>(
uri, format_name, min_upload_part_size, getSampleBlock(), context_global, uri,
access_key_id,
secret_access_key,
format_name,
min_upload_part_size,
getSampleBlock(),
context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global), ConnectionTimeouts::getHTTPTimeouts(context_global),
IStorage::chooseCompressionMethod(uri.toString(), compression_method)); IStorage::chooseCompressionMethod(uri.toString(), compression_method));
} }
@ -190,29 +212,35 @@ void registerStorageS3(StorageFactory & factory)
{ {
ASTs & engine_args = args.engine_args; ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2 && engine_args.size() != 3) if (engine_args.size() < 2 || engine_args.size() > 5)
throw Exception( throw Exception(
"Storage S3 requires 2 or 3 arguments: url, name of used format and compression_method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); "Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); for (size_t i = 0; i < engine_args.size(); ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
Poco::URI uri(url); Poco::URI uri(url);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); String format_name = engine_args[engine_args.size() - 1]->as<ASTLiteral &>().value.safeGet<String>();
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); String access_key_id;
String secret_access_key;
if (engine_args.size() >= 4)
{
access_key_id = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size; UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
String compression_method; String compression_method;
if (engine_args.size() == 3) if (engine_args.size() == 3 || engine_args.size() == 5)
{ compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); else
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); compression_method = "auto";
} else compression_method = "auto";
return StorageS3::create(uri, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context); return StorageS3::create(uri, access_key_id, secret_access_key, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
}); });
} }
} }

View File

@ -18,8 +18,10 @@ class StorageS3 : public ext::shared_ptr_helper<StorageS3>, public IStorage
public: public:
StorageS3( StorageS3(
const Poco::URI & uri_, const Poco::URI & uri_,
const std::string & database_name_, const String & access_key_id,
const std::string & table_name_, const String & secret_access_key,
const String & database_name_,
const String & table_name_,
const String & format_name_, const String & format_name_,
UInt64 min_upload_part_size_, UInt64 min_upload_part_size_,
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
@ -56,6 +58,8 @@ public:
private: private:
Poco::URI uri; Poco::URI uri;
String access_key_id;
String secret_access_key;
const Context & context_global; const Context & context_global;
String format_name; String format_name;

View File

@ -1,17 +1,84 @@
#include <Storages/StorageS3.h> #include <Storages/StorageS3.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h> #include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Poco/URI.h> #include <Poco/URI.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
/// Parse args
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.at(0)->children;
if (args.size() < 3 || args.size() > 6)
throw Exception("Table function '" + getName() + "' requires 3 to 6 arguments: url, [access_key_id, secret_access_key,] format, structure and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < args.size(); ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
String filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
String format;
String structure;
String access_key_id;
String secret_access_key;
if (args.size() < 5)
{
format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
{
access_key_id = args[1]->as<ASTLiteral &>().value.safeGet<String>();
secret_access_key = args[2]->as<ASTLiteral &>().value.safeGet<String>();
format = args[3]->as<ASTLiteral &>().value.safeGet<String>();
structure = args[4]->as<ASTLiteral &>().value.safeGet<String>();
}
String compression_method;
if (args.size() == 4 || args.size() == 6)
compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>();
else
compression_method = "auto";
ColumnsDescription columns = parseColumnsListFromString(structure, context);
/// Create table
StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
storage->startup();
return storage;
}
StoragePtr TableFunctionS3::getStorage( StoragePtr TableFunctionS3::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const const String & source,
const String & access_key_id,
const String & secret_access_key,
const String & format,
const ColumnsDescription & columns,
Context & global_context,
const std::string & table_name,
const String & compression_method) const
{ {
Poco::URI uri(source); Poco::URI uri(source);
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size; UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(uri, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method); return StorageS3::create(uri, access_key_id, secret_access_key, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method);
} }
void registerTableFunctionS3(TableFunctionFactory & factory) void registerTableFunctionS3(TableFunctionFactory & factory)

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <TableFunctions/ITableFunctionFileLike.h> #include <TableFunctions/ITableFunction.h>
namespace DB namespace DB
@ -8,9 +8,9 @@ namespace DB
class Context; class Context;
/* s3(source, format, structure) - creates a temporary storage for a file in S3 /* s3(source, [access_key_id, secret_access_key,] format, structure) - creates a temporary storage for a file in S3
*/ */
class TableFunctionS3 : public ITableFunctionFileLike class TableFunctionS3 : public ITableFunction
{ {
public: public:
static constexpr auto name = "s3"; static constexpr auto name = "s3";
@ -20,13 +20,20 @@ public:
} }
private: private:
StoragePtr executeImpl(
const ASTPtr & ast_function,
const Context & context,
const std::string & table_name) const override;
StoragePtr getStorage( StoragePtr getStorage(
const String & source, const String & source,
const String & access_key_id,
const String & secret_access_key,
const String & format, const String & format,
const ColumnsDescription & columns, const ColumnsDescription & columns,
Context & global_context, Context & global_context,
const std::string & table_name, const std::string & table_name,
const String & compression_method) const override; const String & compression_method) const;
}; };
} }

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python #!/usr/bin/env python
from __future__ import print_function
import sys import sys
import os import os
import os.path import os.path
@ -72,6 +73,8 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None: while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01) sleep(0.01)
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files. # Normalize randomized database names in stdout, stderr files.
os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stdout_file)) os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stdout_file))
os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stderr_file)) os.system("sed -i -e 's/{test_db}/default/g' {file}".format(test_db=args.database, file=stderr_file))
@ -81,7 +84,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
stderr = open(stderr_file, 'r').read() if os.path.exists(stderr_file) else '' stderr = open(stderr_file, 'r').read() if os.path.exists(stderr_file) else ''
stderr = unicode(stderr, errors='replace', encoding='utf-8') stderr = unicode(stderr, errors='replace', encoding='utf-8')
return proc, stdout, stderr return proc, stdout, stderr, total_time
def need_retry(stderr): def need_retry(stderr):
@ -149,6 +152,10 @@ def run_tests_array(all_tests_with_params):
client_options = get_additional_client_options(args) client_options = get_additional_client_options(args)
def print_test_time(test_time):
if args.print_time:
print(" {0:.2f} sec.".format(test_time), end='')
if len(all_tests): if len(all_tests):
print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n") print("\nRunning {} {} tests.".format(len(all_tests), suite) + "\n")
@ -194,7 +201,7 @@ def run_tests_array(all_tests_with_params):
stdout_file = os.path.join(suite_tmp_dir, name) + '.stdout' stdout_file = os.path.join(suite_tmp_dir, name) + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr' stderr_file = os.path.join(suite_tmp_dir, name) + '.stderr'
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file) proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
if proc.returncode is None: if proc.returncode is None:
try: try:
proc.kill() proc.kill()
@ -203,11 +210,13 @@ def run_tests_array(all_tests_with_params):
raise raise
failures += 1 failures += 1
print("{0} - Timeout!".format(MSG_FAIL)) print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - Timeout!")
else: else:
counter = 1 counter = 1
while proc.returncode != 0 and need_retry(stderr): while proc.returncode != 0 and need_retry(stderr):
proc, stdout, stderr = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file) proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
sleep(2**counter) sleep(2**counter)
counter += 1 counter += 1
if counter > 6: if counter > 6:
@ -216,7 +225,9 @@ def run_tests_array(all_tests_with_params):
if proc.returncode != 0: if proc.returncode != 0:
failures += 1 failures += 1
failures_chain += 1 failures_chain += 1
print("{0} - return code {1}".format(MSG_FAIL, proc.returncode)) print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - return code {}".format(proc.returncode))
if stderr: if stderr:
print(stderr.encode('utf-8')) print(stderr.encode('utf-8'))
@ -227,24 +238,34 @@ def run_tests_array(all_tests_with_params):
elif stderr: elif stderr:
failures += 1 failures += 1
failures_chain += 1 failures_chain += 1
print("{0} - having stderror:\n{1}".format(MSG_FAIL, stderr.encode('utf-8'))) print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - having stderror:\n{}".format(stderr.encode('utf-8')))
elif 'Exception' in stdout: elif 'Exception' in stdout:
failures += 1 failures += 1
failures_chain += 1 failures_chain += 1
print("{0} - having exception:\n{1}".format(MSG_FAIL, stdout.encode('utf-8'))) print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - having exception:\n{}".format(stdout.encode('utf-8')))
elif not os.path.isfile(reference_file): elif not os.path.isfile(reference_file):
print("{0} - no reference file".format(MSG_UNKNOWN)) print(MSG_UNKNOWN, end='')
print_test_time(total_time)
print(" - no reference file")
else: else:
result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout = PIPE) result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout = PIPE)
if result_is_different: if result_is_different:
diff = Popen(['diff', '-U', str(args.unified), reference_file, stdout_file], stdout = PIPE).communicate()[0] diff = Popen(['diff', '-U', str(args.unified), reference_file, stdout_file], stdout = PIPE).communicate()[0]
failures += 1 failures += 1
print("{0} - result differs with reference:\n{1}".format(MSG_FAIL, diff)) print(MSG_FAIL, end='')
print_test_time(total_time)
print(" - result differs with reference:\n{}".format(diff))
else: else:
passed_total += 1 passed_total += 1
failures_chain = 0 failures_chain = 0
print(MSG_OK) print(MSG_OK, end='')
print_test_time(total_time)
print()
if os.path.exists(stdout_file): if os.path.exists(stdout_file):
os.remove(stdout_file) os.remove(stdout_file)
if os.path.exists(stderr_file): if os.path.exists(stderr_file):
@ -503,6 +524,7 @@ if __name__ == '__main__':
parser.add_argument('--skip', nargs='+', help="Skip these tests") parser.add_argument('--skip', nargs='+', help="Skip these tests")
parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests') parser.add_argument('--no-long', action='store_false', dest='no_long', help='Do not run long tests')
parser.add_argument('--client-option', nargs='+', help='Specify additional client argument') parser.add_argument('--client-option', nargs='+', help='Specify additional client argument')
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
group=parser.add_mutually_exclusive_group(required=False) group=parser.add_mutually_exclusive_group(required=False)
group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests') group.add_argument('--zookeeper', action='store_true', default=None, dest='zookeeper', help='Run zookeeper related tests')
group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests') group.add_argument('--no-zookeeper', action='store_false', default=None, dest='zookeeper', help='Do not run zookeeper related tests')

View File

@ -5,6 +5,9 @@ import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.client
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler()) logging.getLogger().addHandler(logging.StreamHandler())
@ -53,12 +56,18 @@ def prepare_s3_bucket(cluster):
minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy)) minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))
cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
if minio_client.bucket_exists(cluster.minio_restricted_bucket):
minio_client.remove_bucket(cluster.minio_restricted_bucket)
minio_client.make_bucket(cluster.minio_restricted_bucket)
# Returns content of given S3 file as string. # Returns content of given S3 file as string.
def get_s3_file_content(cluster, filename): def get_s3_file_content(cluster, bucket, filename):
# type: (ClickHouseCluster, str) -> str # type: (ClickHouseCluster, str) -> str
data = cluster.minio_client.get_object(cluster.minio_bucket, filename) data = cluster.minio_client.get_object(bucket, filename)
data_str = "" data_str = ""
for chunk in data.stream(): for chunk in data.stream():
data_str += chunk data_str += chunk
@ -101,53 +110,76 @@ def run_query(instance, query, stdin=None, settings=None):
# Test simple put. # Test simple put.
def test_put(cluster): @pytest.mark.parametrize("maybe_auth,positive", [
("",True),
("'minio','minio123',",True),
("'wrongid','wrongkey',",False)
])
def test_put(cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None # type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
values_csv = "1,2,3\n3,2,1\n78,43,45\n" values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv" filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') values {}".format(
cluster.minio_host, cluster.minio_port, cluster.minio_bucket, filename, table_format, values) cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format, values)
run_query(instance, put_query)
assert values_csv == get_s3_file_content(cluster, filename) try:
run_query(instance, put_query)
except helpers.client.QueryRuntimeException:
assert not positive
else:
assert positive
assert values_csv == get_s3_file_content(cluster, bucket, filename)
# Test put values in CSV format. # Test put values in CSV format.
def test_put_csv(cluster): @pytest.mark.parametrize("maybe_auth,positive", [
("",True),
("'minio','minio123',",True),
("'wrongid','wrongkey',",False)
])
def test_put_csv(cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None # type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv" filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') format CSV".format( put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
cluster.minio_host, cluster.minio_port, cluster.minio_bucket, filename, table_format) cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format)
csv_data = "8,9,16\n11,18,13\n22,14,2\n" csv_data = "8,9,16\n11,18,13\n22,14,2\n"
run_query(instance, put_query, stdin=csv_data)
assert csv_data == get_s3_file_content(cluster, filename) try:
run_query(instance, put_query, stdin=csv_data)
except helpers.client.QueryRuntimeException:
assert not positive
else:
assert positive
assert csv_data == get_s3_file_content(cluster, bucket, filename)
# Test put and get with S3 server redirect. # Test put and get with S3 server redirect.
def test_put_get_with_redirect(cluster): def test_put_get_with_redirect(cluster):
# type: (ClickHouseCluster) -> None # type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)" values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
values_csv = "1,1,1\n1,1,1\n11,11,11\n" values_csv = "1,1,1\n1,1,1\n11,11,11\n"
filename = "test.csv" filename = "test.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, cluster.minio_bucket, filename, table_format, values) cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
run_query(instance, query) run_query(instance, query)
assert values_csv == get_s3_file_content(cluster, filename) assert values_csv == get_s3_file_content(cluster, bucket, filename)
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format( query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, cluster.minio_bucket, filename, table_format) cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format)
stdout = run_query(instance, query) stdout = run_query(instance, query)
assert list(map(str.split, stdout.splitlines())) == [ assert list(map(str.split, stdout.splitlines())) == [
@ -158,9 +190,15 @@ def test_put_get_with_redirect(cluster):
# Test multipart put. # Test multipart put.
def test_multipart_put(cluster): @pytest.mark.parametrize("maybe_auth,positive", [
("",True),
("'minio','minio123',",True),
("'wrongid','wrongkey',",False)
])
def test_multipart_put(cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None # type: (ClickHouseCluster) -> None
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -178,14 +216,19 @@ def test_multipart_put(cluster):
assert len(csv_data) > min_part_size_bytes assert len(csv_data) > min_part_size_bytes
filename = "test_multipart.csv" filename = "test_multipart.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') format CSV".format( put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
cluster.minio_redirect_host, cluster.minio_redirect_port, cluster.minio_bucket, filename, table_format) cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes}) try:
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
except helpers.client.QueryRuntimeException:
assert not positive
else:
assert positive
# Use Nginx access logs to count number of parts uploaded to Minio. # Use Nginx access logs to count number of parts uploaded to Minio.
nginx_logs = get_nginx_access_logs() nginx_logs = get_nginx_access_logs()
uploaded_parts = filter(lambda log_line: log_line.find(filename) >= 0 and log_line.find("PUT") >= 0, nginx_logs) uploaded_parts = filter(lambda log_line: log_line.find(filename) >= 0 and log_line.find("PUT") >= 0, nginx_logs)
assert uploaded_parts > 1 assert uploaded_parts > 1
assert csv_data == get_s3_file_content(cluster, filename) assert csv_data == get_s3_file_content(cluster, bucket, filename)

View File

@ -0,0 +1,22 @@
<test>
<type>loop</type>
<stop_conditions>
<all_of>
<iterations>3</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>5</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<!-- 100 AND operands -->
<query>select count() from numbers(10000000) where number != 96594 AND number != 18511 AND number != 98085 AND number != 84177 AND number != 70314 AND number != 28083 AND number != 54202 AND number != 66522 AND number != 66939 AND number != 99469 AND number != 65776 AND number != 22876 AND number != 42151 AND number != 19924 AND number != 66681 AND number != 63022 AND number != 17487 AND number != 83914 AND number != 59754 AND number != 968 AND number != 73334 AND number != 68569 AND number != 49853 AND number != 33155 AND number != 31777 AND number != 99698 AND number != 26708 AND number != 76409 AND number != 42191 AND number != 55397 AND number != 25724 AND number != 39170 AND number != 22728 AND number != 98238 AND number != 86052 AND number != 12756 AND number != 13948 AND number != 57774 AND number != 82511 AND number != 11337 AND number != 23506 AND number != 11875 AND number != 58536 AND number != 56919 AND number != 25986 AND number != 80710 AND number != 61797 AND number != 99244 AND number != 11665 AND number != 15758 AND number != 82899 AND number != 63150 AND number != 7198 AND number != 40071 AND number != 46310 AND number != 78488 AND number != 9273 AND number != 91878 AND number != 57904 AND number != 53941 AND number != 75675 AND number != 12093 AND number != 50090 AND number != 59675 AND number != 41632 AND number != 81448 AND number != 46821 AND number != 51919 AND number != 49028 AND number != 71059 AND number != 15673 AND number != 6132 AND number != 15473 AND number != 32527 AND number != 63842 AND number != 33121 AND number != 53271 AND number != 86033 AND number != 96807 AND number != 4791 AND number != 80089 AND number != 51616 AND number != 46311 AND number != 82844 AND number != 59353 AND number != 63538 AND number != 64857 AND number != 58471 AND number != 29870 AND number != 80209 AND number != 61000 AND number != 75991 AND number != 44506 AND number != 11283 AND number != 6335 AND number != 73502 AND number != 22354 AND number != 72816 AND number != 66399 AND number != 61703</query>
<!-- 10 AND operands -->
<query>select count() from numbers(10000000) where number != 96594 AND number != 18511 AND number != 98085 AND number != 84177 AND number != 70314 AND number != 28083 AND number != 54202 AND number != 66522 AND number != 66939 AND number != 99469</query>
</test>

View File

@ -2,7 +2,7 @@ drop table if exists test_table_s3_syntax
; ;
create table test_table_s3_syntax (id UInt32) ENGINE = S3('') create table test_table_s3_syntax (id UInt32) ENGINE = S3('')
; -- { serverError 42 } ; -- { serverError 42 }
create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','','') create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','','','','')
; -- { serverError 42 } ; -- { serverError 42 }
drop table if exists test_table_s3_syntax drop table if exists test_table_s3_syntax
; ;

View File

@ -130,6 +130,17 @@ Possible values:
Default value: 0. Default value: 0.
## max_http_get_redirects {#setting-max_http_get_redirects}
Limits the maximum number of HTTP GET redirect hops for [URL](../table_engines/url.md)-engine tables. The setting applies to the both types of tables: created by [CREATE TABLE](../../query_language/create/#create-table-query) query and by [url](../../query_language/table_functions/url.md) table function.
Possible values:
- Positive integer number of hops.
- 0 — Unlimited number of hops.
Default value: 0.
## input_format_allow_errors_num {#settings-input_format_allow_errors_num} ## input_format_allow_errors_num {#settings-input_format_allow_errors_num}
Sets the maximum number of acceptable errors when reading from text formats (CSV, TSV, etc.). Sets the maximum number of acceptable errors when reading from text formats (CSV, TSV, etc.).

View File

@ -17,6 +17,8 @@ additional headers for getting a response from the server.
respectively. For processing `POST` requests, the remote server must support respectively. For processing `POST` requests, the remote server must support
[Chunked transfer encoding](https://en.wikipedia.org/wiki/Chunked_transfer_encoding). [Chunked transfer encoding](https://en.wikipedia.org/wiki/Chunked_transfer_encoding).
You can limit the maximum number of HTTP GET redirect hops by the [max_http_get_redirects](../settings/settings.md#setting-max_http_get_redirects) setting.
**Example:** **Example:**
**1.** Create a `url_engine_table` table on the server : **1.** Create a `url_engine_table` table on the server :

View File

@ -215,7 +215,8 @@ nav:
- 'Overview of ClickHouse Architecture': 'development/architecture.md' - 'Overview of ClickHouse Architecture': 'development/architecture.md'
- 'How to Build ClickHouse on Linux': 'development/build.md' - 'How to Build ClickHouse on Linux': 'development/build.md'
- 'How to Build ClickHouse on Mac OS X': 'development/build_osx.md' - 'How to Build ClickHouse on Mac OS X': 'development/build_osx.md'
- 'How to Build ClickHouse on Linux for Mac OS X': 'development/build_cross.md' - 'How to Build ClickHouse on Linux for Mac OS X': 'development/build_cross_osx.md'
- 'How to Build ClickHouse on Linux for AARCH64 (ARM64)': 'development/build_cross_arm.md'
- 'How to Write C++ Code': 'development/style.md' - 'How to Write C++ Code': 'development/style.md'
- 'How to Run ClickHouse Tests': 'development/tests.md' - 'How to Run ClickHouse Tests': 'development/tests.md'
- 'The Beginner ClickHouse Developer Instruction': 'development/developer_instruction.md' - 'The Beginner ClickHouse Developer Instruction': 'development/developer_instruction.md'