Merge branch 'master' into dictionary-invalidate-query

This commit is contained in:
Nikolai Kochetov 2017-05-23 13:27:31 +03:00
commit f0ec7901e9
54 changed files with 2239 additions and 223 deletions

View File

@ -606,5 +606,13 @@ void Block::unshareColumns()
} }
} }
void Block::updateHash(SipHash & hash) const
{
for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no)
{
for (auto & col : getColumns())
col.column->updateHashWithValue(row_no, hash);
}
}
} }

View File

@ -119,6 +119,12 @@ public:
*/ */
void unshareColumns(); void unshareColumns();
/** Updates SipHash of the Block, using update method of columns.
* Returns hash for block, that could be used to differentiate blocks
* with same structure, but different data.
*/
void updateHash(SipHash & hash) const;
private: private:
void eraseImpl(size_t position); void eraseImpl(size_t position);
void initializeIndexByName(); void initializeIndexByName();

View File

@ -177,7 +177,7 @@ namespace ErrorCodes
extern const int TOO_BIG_AST = 168; extern const int TOO_BIG_AST = 168;
extern const int BAD_TYPE_OF_FIELD = 169; extern const int BAD_TYPE_OF_FIELD = 169;
extern const int BAD_GET = 170; extern const int BAD_GET = 170;
extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE = 171; extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE = 171;
extern const int CANNOT_CREATE_DIRECTORY = 172; extern const int CANNOT_CREATE_DIRECTORY = 172;
extern const int CANNOT_ALLOCATE_MEMORY = 173; extern const int CANNOT_ALLOCATE_MEMORY = 173;
extern const int CYCLIC_ALIASES = 174; extern const int CYCLIC_ALIASES = 174;

View File

@ -12,6 +12,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_DATE; extern const int CANNOT_PARSE_DATE;
extern const int CANNOT_PARSE_DATETIME; extern const int CANNOT_PARSE_DATETIME;
extern const int CANNOT_READ_ARRAY_FROM_TEXT; extern const int CANNOT_READ_ARRAY_FROM_TEXT;
extern const int CANNOT_PARSE_NUMBER;
} }
@ -33,7 +34,8 @@ static bool isParseError(int code)
|| code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING || code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
|| code == ErrorCodes::CANNOT_PARSE_DATE || code == ErrorCodes::CANNOT_PARSE_DATE
|| code == ErrorCodes::CANNOT_PARSE_DATETIME || code == ErrorCodes::CANNOT_PARSE_DATETIME
|| code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT; || code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT
|| code == ErrorCodes::CANNOT_PARSE_NUMBER;
} }

View File

@ -55,17 +55,17 @@ Block CastTypeBlockInputStream::readImpl()
if (it == cast_description.end()) if (it == cast_description.end())
{ {
// Leave the same column
res.insert(src_column); res.insert(src_column);
} }
else else
{ {
CastElement & cast_element = it->second; CastElement & cast_element = it->second;
size_t tmp_col = cast_element.tmp_col_offset; size_t tmp_col = cast_element.tmp_col_offset;
ColumnNumbers arguments{tmp_col, tmp_col + 1};
tmp_conversion_block.getByPosition(tmp_col).column = src_column.column;
cast_element.function->execute(tmp_conversion_block, arguments, tmp_col + 2); tmp_conversion_block.getByPosition(tmp_col).column = src_column.column;
cast_element.function->execute(tmp_conversion_block, ColumnNumbers{tmp_col, tmp_col + 1}, tmp_col + 2);
res.insert(tmp_conversion_block.getByPosition(tmp_col + 2)); res.insert(tmp_conversion_block.getByPosition(tmp_col + 2));
} }
} }
@ -93,22 +93,24 @@ void CastTypeBlockInputStream::initialize(const Block & src_block)
/// Force conversion if source and destination types is different. /// Force conversion if source and destination types is different.
if (!ref_column.type->equals(*src_column.type)) if (!ref_column.type->equals(*src_column.type))
{ {
ColumnWithTypeAndName src_columnn_copy = src_column.cloneEmpty(); ColumnWithTypeAndName res_type_name_column(std::make_shared<ColumnConstString>(1, ref_column.type->getName()), std::make_shared<DataTypeString>(), "");
ColumnWithTypeAndName alias_column(std::make_shared<ColumnConstString>(1, ref_column.type->getName()), std::make_shared<DataTypeString>(), ""); ColumnWithTypeAndName res_blank_column(nullptr, ref_column.type->clone(), src_column.name);
ColumnWithTypeAndName result_column(nullptr, ref_column.type->clone(), src_column.name);
/// Prepares function to execution
auto cast_function = FunctionFactory::instance().get("CAST", context);
{
DataTypePtr unused_return_type; DataTypePtr unused_return_type;
std::vector<ExpressionAction> unused_prerequisites; std::vector<ExpressionAction> unused_prerequisites;
ColumnsWithTypeAndName arguments{src_columnn_copy, alias_column}; ColumnsWithTypeAndName arguments{src_column, res_type_name_column};
/// Prepares function to execution. TODO It is not obvious.
auto cast_function = FunctionFactory::instance().get("CAST", context);
cast_function->getReturnTypeAndPrerequisites(arguments, unused_return_type, unused_prerequisites); cast_function->getReturnTypeAndPrerequisites(arguments, unused_return_type, unused_prerequisites);
}
/// Prefill arguments and result column for current CAST
tmp_conversion_block.insert(src_column); tmp_conversion_block.insert(src_column);
tmp_conversion_block.insert(alias_column); tmp_conversion_block.insert(res_type_name_column);
tmp_conversion_block.insert(result_column); tmp_conversion_block.insert(res_blank_column);
/// Index of src_column blank in tmp_conversion_block
size_t tmp_col_offset = cast_description.size() * 3; size_t tmp_col_offset = cast_description.size() * 3;
cast_description.emplace(src_col, CastElement(std::move(cast_function), tmp_col_offset)); cast_description.emplace(src_col, CastElement(std::move(cast_function), tmp_col_offset));
} }

View File

@ -27,20 +27,26 @@ private:
const Context & context; const Context & context;
Block ref_defenition; Block ref_defenition;
/// Initializes cast_description and prepares tmp_conversion_block
void initialize(const Block & src_block); void initialize(const Block & src_block);
bool initialized = false; bool initialized = false;
struct CastElement struct CastElement
{ {
/// Prepared function to do conversion
std::shared_ptr<IFunction> function; std::shared_ptr<IFunction> function;
/// Position of first function argument in tmp_conversion_block
size_t tmp_col_offset; size_t tmp_col_offset;
CastElement(std::shared_ptr<IFunction> && function_, size_t tmp_col_offset_); CastElement(std::shared_ptr<IFunction> && function_, size_t tmp_col_offset_);
}; };
/// Describes required conversions on source block /// Describes required conversions on source block
/// Contains column numbers in source block that should be converted
std::map<size_t, CastElement> cast_description; std::map<size_t, CastElement> cast_description;
/// Auxiliary block, stores arguments and results of required CAST calls
/// Auxiliary block, stores prefilled arguments and result for each CAST function in cast_description
/// 3 columns are allocated for each conversion: [blank of source column, column with res type name, blank of res column]
Block tmp_conversion_block; Block tmp_conversion_block;
}; };

View File

@ -155,9 +155,11 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf
else if (name == "PrettySpaceNoEscapes") else if (name == "PrettySpaceNoEscapes")
return std::make_shared<PrettySpaceBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context); return std::make_shared<PrettySpaceBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
else if (name == "Vertical") else if (name == "Vertical")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRowOutputStream>(buf, sample, context)); return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRowOutputStream>(
buf, sample, settings.output_format_pretty_max_rows, context));
else if (name == "VerticalRaw") else if (name == "VerticalRaw")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRawRowOutputStream>(buf, sample, context)); return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRawRowOutputStream>(
buf, sample, settings.output_format_pretty_max_rows, context));
else if (name == "Values") else if (name == "Values")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf)); return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf));
else if (name == "JSON") else if (name == "JSON")

View File

@ -124,7 +124,7 @@ bool JSONEachRowRowInputStream::read(Block & block)
} }
skipWhitespaceIfAny(istr); skipWhitespaceIfAny(istr);
if (!istr.eof() && *istr.position() == ',') if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';')) /// Semicolon is added for convenience as it could be used at end of INSERT query.
++istr.position(); ++istr.position();
/// Fill non-visited columns with the default values. /// Fill non-visited columns with the default values.

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
} }
@ -130,7 +130,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs &
{ {
throw Exception("Merging blocks has different names or types of columns:\n" throw Exception("Merging blocks has different names or types of columns:\n"
+ shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(), + shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(),
ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
} }
} }
} }

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
} }
@ -108,6 +109,7 @@ bool TSKVRowInputStream::read(Block & block)
{ {
StringRef name_ref; StringRef name_ref;
bool has_value = readName(istr, name_ref, name_buf); bool has_value = readName(istr, name_ref, name_buf);
ssize_t index = -1;
if (has_value) if (has_value)
{ {
@ -126,7 +128,7 @@ bool TSKVRowInputStream::read(Block & block)
} }
else else
{ {
size_t index = it->second; index = it->second;
if (read_columns[index]) if (read_columns[index])
throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
@ -159,7 +161,16 @@ bool TSKVRowInputStream::read(Block & block)
break; break;
} }
else else
throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); {
/// Possibly a garbage was written into column, remove it
if (index >= 0)
{
block.getByPosition(index).column->popBack(1);
read_columns[index] = false;
}
throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
}
} }
} }

View File

@ -10,8 +10,9 @@
namespace DB namespace DB
{ {
VerticalRowOutputStream::VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const Context & context) VerticalRowOutputStream::VerticalRowOutputStream(
: ostr(ostr_), sample(sample_) WriteBuffer & ostr_, const Block & sample_, size_t max_rows_, const Context & context)
: ostr(ostr_), sample(sample_), max_rows(max_rows_)
{ {
size_t columns = sample.columns(); size_t columns = sample.columns();
@ -60,6 +61,9 @@ void VerticalRowOutputStream::flush()
void VerticalRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num) void VerticalRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{ {
if (row_number > max_rows)
return;
writeString(names_and_paddings[field_number], ostr); writeString(names_and_paddings[field_number], ostr);
writeValue(column, type, row_num); writeValue(column, type, row_num);
writeChar('\n', ostr); writeChar('\n', ostr);
@ -82,6 +86,10 @@ void VerticalRawRowOutputStream::writeValue(const IColumn & column, const IDataT
void VerticalRowOutputStream::writeRowStartDelimiter() void VerticalRowOutputStream::writeRowStartDelimiter()
{ {
++row_number; ++row_number;
if (row_number > max_rows)
return;
writeCString("Row ", ostr); writeCString("Row ", ostr);
writeIntText(row_number, ostr); writeIntText(row_number, ostr);
writeCString(":\n", ostr); writeCString(":\n", ostr);
@ -95,9 +103,77 @@ void VerticalRowOutputStream::writeRowStartDelimiter()
void VerticalRowOutputStream::writeRowBetweenDelimiter() void VerticalRowOutputStream::writeRowBetweenDelimiter()
{ {
if (row_number > max_rows)
return;
writeCString("\n", ostr); writeCString("\n", ostr);
field_number = 0; field_number = 0;
} }
void VerticalRowOutputStream::writeSuffix()
{
if (row_number > max_rows)
{
writeCString("Showed first ", ostr);
writeIntText(max_rows, ostr);
writeCString(".\n", ostr);
}
if (totals || extremes)
{
writeCString("\n", ostr);
writeTotals();
writeExtremes();
}
}
void VerticalRowOutputStream::writeSpecialRow(const Block & block, size_t row_num, const char * title)
{
writeCString("\n", ostr);
row_number = 0;
field_number = 0;
size_t columns = block.columns();
writeCString(title, ostr);
writeCString(":\n", ostr);
size_t width = strlen(title) + 1;
for (size_t i = 0; i < width; ++i)
writeCString("", ostr);
writeChar('\n', ostr);
for (size_t i = 0; i < columns; ++i)
{
if (i != 0)
writeFieldDelimiter();
auto & col = block.getByPosition(i);
writeField(*col.column.get(), *col.type.get(), row_num);
}
}
void VerticalRowOutputStream::writeTotals()
{
if (totals)
{
writeSpecialRow(totals, 0, "Totals");
}
}
void VerticalRowOutputStream::writeExtremes()
{
if (extremes)
{
writeSpecialRow(extremes, 0, "Min");
writeSpecialRow(extremes, 1, "Max");
}
}
} }

View File

@ -18,24 +18,37 @@ class Context;
class VerticalRowOutputStream : public IRowOutputStream class VerticalRowOutputStream : public IRowOutputStream
{ {
public: public:
VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const Context & context); VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_, size_t max_rows_, const Context & context);
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override; void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeRowStartDelimiter() override; void writeRowStartDelimiter() override;
void writeRowBetweenDelimiter() override; void writeRowBetweenDelimiter() override;
void writeSuffix() override;
void flush() override; void flush() override;
void setTotals(const Block & totals_) override { totals = totals_; }
void setExtremes(const Block & extremes_) override { extremes = extremes_; }
protected: protected:
virtual void writeValue(const IColumn & column, const IDataType & type, size_t row_num) const; virtual void writeValue(const IColumn & column, const IDataType & type, size_t row_num) const;
void writeTotals();
void writeExtremes();
/// For totals and extremes.
void writeSpecialRow(const Block & block, size_t row_num, const char * title);
WriteBuffer & ostr; WriteBuffer & ostr;
const Block sample; const Block sample;
size_t max_rows;
size_t field_number = 0; size_t field_number = 0;
size_t row_number = 0; size_t row_number = 0;
using NamesAndPaddings = std::vector<String>; using NamesAndPaddings = std::vector<String>;
NamesAndPaddings names_and_paddings; NamesAndPaddings names_and_paddings;
Block totals;
Block extremes;
}; };
@ -44,8 +57,7 @@ protected:
class VerticalRawRowOutputStream final : public VerticalRowOutputStream class VerticalRawRowOutputStream final : public VerticalRowOutputStream
{ {
public: public:
VerticalRawRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const Context & context) using VerticalRowOutputStream::VerticalRowOutputStream;
: VerticalRowOutputStream(ostr_, sample_, context) {}
protected: protected:
void writeValue(const IColumn & column, const IDataType & type, size_t row_num) const override; void writeValue(const IColumn & column, const IDataType & type, size_t row_num) const override;

View File

@ -31,24 +31,13 @@ public:
bool operator<(const Part & rhs) const bool operator<(const Part & rhs) const
{ {
if (month != rhs.month) return std::tie(month, left, right, level) < std::tie(rhs.month, rhs.left, rhs.right, rhs.level);
return month < rhs.month;
if (left != rhs.left)
return left < rhs.left;
if (right != rhs.right)
return right < rhs.right;
if (level != rhs.level)
return level < rhs.level;
return false;
} }
/// Contains another part (obtained after combining another part with some other) /// Contains another part (obtained after merging another part with some other)
bool contains(const Part & rhs) const bool contains(const Part & rhs) const
{ {
return month == rhs.month /// Parts for different months are not combined return month == rhs.month /// Parts for different months are not merged
&& left_date <= rhs.left_date && left_date <= rhs.left_date
&& right_date >= rhs.right_date && right_date >= rhs.right_date
&& left <= rhs.left && left <= rhs.left

View File

@ -39,7 +39,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int INFINITE_LOOP; extern const int INFINITE_LOOP;
extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
} }
@ -198,7 +198,7 @@ static void appendBlock(const Block & from, Block & to)
if (col_from.getName() != col_to.getName()) if (col_from.getName() != col_to.getName())
throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no) throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no)
+ ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); + ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
col_to.insertRangeFrom(col_from, 0, rows); col_to.insertRangeFrom(col_from, 0, rows);
} }

View File

@ -19,6 +19,7 @@
#include <Storages/StorageStripeLog.h> #include <Storages/StorageStripeLog.h>
#include <Storages/StorageMemory.h> #include <Storages/StorageMemory.h>
#include <Storages/StorageBuffer.h> #include <Storages/StorageBuffer.h>
#include <Storages/StorageTrivialBuffer.h>
#include <Storages/StorageNull.h> #include <Storages/StorageNull.h>
#include <Storages/StorageMerge.h> #include <Storages/StorageMerge.h>
#include <Storages/StorageMergeTree.h> #include <Storages/StorageMergeTree.h>
@ -556,6 +557,54 @@ StoragePtr StorageFactory::get(
num_buckets, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes}, num_buckets, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes},
destination_database, destination_table); destination_database, destination_table);
} }
else if (name == "TrivialBuffer")
{
/** TrivialBuffer(db, table, num_blocks_to_deduplicate, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes, path_in_zookeeper)
*
* db, table - in which table to put data from buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for pushing out from the buffer.
* num_blocks_to_deduplicate - level of parallelism.
*/
const std::string error_message_argument_number_mismatch = "Storage TrivialBuffer requires 10 parameters: "
" destination database, destination table, num_blocks_to_deduplicate, min_time, max_time, min_rows,"
" max_rows, min_bytes, max_bytes, path_in_zookeeper.";
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception(error_message_argument_number_mismatch,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 10)
throw Exception(error_message_argument_number_mismatch,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[0] = evaluateConstantExpressionOrIdentidierAsLiteral(args[0], local_context);
args[1] = evaluateConstantExpressionOrIdentidierAsLiteral(args[1], local_context);
String destination_database = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
String destination_table = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
size_t num_blocks_to_deduplicate = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[2]).value);
time_t min_time = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[3]).value);
time_t max_time = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[4]).value);
size_t min_rows = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[5]).value);
size_t max_rows = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[6]).value);
size_t min_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[7]).value);
size_t max_bytes = applyVisitor(FieldVisitorConvertToNumber<size_t>(), typeid_cast<ASTLiteral &>(*args[8]).value);
String path_in_zk_for_deduplication = static_cast<const ASTLiteral &>(*args[9]).value.safeGet<String>();
return StorageTrivialBuffer::create(
table_name, columns,
materialized_columns, alias_columns, column_defaults,
context, num_blocks_to_deduplicate, path_in_zk_for_deduplication,
{min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes},
destination_database, destination_table);
}
else if (endsWith(name, "MergeTree")) else if (endsWith(name, "MergeTree"))
{ {
/** [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 combinations) engines /** [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 combinations) engines

View File

@ -0,0 +1,561 @@
#include <Storages/StorageTrivialBuffer.h>
#include <Databases/IDatabase.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Poco/Ext/ThreadNumber.h>
#include <ext/range.hpp>
namespace ProfileEvents
{
extern const Event StorageBufferFlush;
extern const Event StorageBufferErrorOnFlush;
extern const Event StorageBufferPassedAllMinThresholds;
extern const Event StorageBufferPassedTimeMaxThreshold;
extern const Event StorageBufferPassedRowsMaxThreshold;
extern const Event StorageBufferPassedBytesMaxThreshold;
}
namespace CurrentMetrics
{
extern const Metric StorageBufferRows;
extern const Metric StorageBufferBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int INFINITE_LOOP;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
StoragePtr StorageTrivialBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_, const size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_)
{
return make_shared(
name_, columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, num_blocks_to_deduplicate_, path_in_zk_for_deduplication_,
min_thresholds_, max_thresholds_,
destination_database_, destination_table_);
}
StorageTrivialBuffer::StorageTrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_, const size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_), context(context_),
num_blocks_to_deduplicate(num_blocks_to_deduplicate_),
path_in_zk_for_deduplication(path_in_zk_for_deduplication_),
zookeeper(context.getZooKeeper()),
deduplication_controller(num_blocks_to_deduplicate, zookeeper, path_in_zk_for_deduplication),
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
destination_database(destination_database_), destination_table(destination_table_),
no_destination(destination_database.empty() && destination_table.empty()),
log(&Logger::get("TrivialBuffer (" + name + ")")),
flush_thread(&StorageTrivialBuffer::flushThread, this)
{
zookeeper->createAncestors(path_in_zk_for_deduplication);
zookeeper->createOrUpdate(path_in_zk_for_deduplication, {}, zkutil::CreateMode::Persistent);
}
class TrivialBufferBlockInputStream : public IProfilingBlockInputStream
{
public:
TrivialBufferBlockInputStream(const Names & column_names_, BlocksList::iterator begin_,
BlocksList::iterator end_, StorageTrivialBuffer & buffer_)
: column_names(column_names_), buffer(buffer_),
begin(begin_), end(end_), it(begin_) {}
String getName() const { return "TrivialStorageBuffer"; }
String getID() const
{
std::stringstream res;
res << "TrivialStorageBuffer(" << &buffer;
for (const auto & name : column_names)
res << ", " << name;
res << ")";
return res.str();
}
protected:
Block readImpl()
{
Block res;
if (it == end)
return res;
for (const auto & column : column_names)
res.insert(it->getByName(column));
++it;
return res;
}
private:
Names column_names;
StorageTrivialBuffer & buffer;
BlocksList::iterator begin, end, it;
};
BlockInputStreams StorageTrivialBuffer::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
BlockInputStreams streams;
if (!no_destination)
{
auto destination = context.getTable(destination_database, destination_table);
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.",
ErrorCodes::INFINITE_LOOP);
/** TrivialStorageBuffer does not support 'PREWHERE',
* so turn off corresponding optimization.
*/
Settings modified_settings = settings;
modified_settings.optimize_move_to_prewhere = false;
streams = destination->read(column_names, query, context, modified_settings,
processed_stage, max_block_size, threads);
}
BlockInputStreams streams_from_buffers;
std::lock_guard<std::mutex> lock(mutex);
size_t size = data.size();
if (threads > size)
threads = size;
for (size_t thread = 0; thread < threads; ++thread)
{
BlocksList::iterator begin = data.begin();
BlocksList::iterator end = data.begin();
std::advance(begin, thread * size / threads);
std::advance(end, (thread + 1) * size / threads);
streams_from_buffers.push_back(std::make_shared<TrivialBufferBlockInputStream>(column_names, begin, end, *this));
}
/** If sources from destination table are already processed to non-starting stage, then we should wrap
* sources from the buffer to the same stage of processing conveyor.
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
for (auto & stream : streams_from_buffers)
stream = InterpreterSelectQuery(query, context, processed_stage, 0, stream).execute().in;
streams.insert(streams.end(), streams_from_buffers.begin(), streams_from_buffers.end());
return streams;
}
template <typename DeduplicatioController>
void StorageTrivialBuffer::addBlock(const Block & block, DeduplicatioController & deduplication_controller)
{
SipHash hash;
block.updateHash(hash);
typename DeduplicatioController::HashType block_hash = DeduplicatioController::getHashFrom(hash);
std::lock_guard<std::mutex> lock(mutex);
if (!deduplication_controller.contains(block_hash))
{
deduplication_controller.insert(block_hash);
current_rows += block.rows();
current_bytes += block.bytes();
data.push_back(block);
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, current_rows);
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, current_bytes);
}
else
{
deduplication_controller.updateOnDeduplication(block_hash);
}
}
void StorageTrivialBuffer::flush(bool check_thresholds, bool is_called_from_background)
{
Block block_to_write;
time_t current_time = time(0);
time_t time_passed = 0;
if (data.empty())
return;
BlocksList::iterator flush_begin, flush_end;
{
std::unique_lock<std::mutex> lock(mutex, std::try_to_lock_t());
if (!lock.owns_lock())
{
// NOTE: is this the behavior we expect from 'flush' concurrency?
if (!is_called_from_background)
LOG_ERROR(log, "Method \'StorageTrivialBuffer::flush\' was called simultaneously from different threads");
return;
}
if (first_write_time)
time_passed = current_time - first_write_time;
if (check_thresholds)
{
if (!checkThresholdsImpl(current_rows, current_bytes, time_passed))
return;
}
else
{
if (current_rows == 0)
return;
}
flush_begin = data.begin();
flush_end = std::prev(data.end());
block_to_write = flush_begin->cloneEmpty();
}
/// Collecting BlockList into single block.
block_to_write.checkNumberOfRows();
flush_end = std::next(flush_end);
for (auto block = flush_begin; block != flush_end; ++block)
{
block->checkNumberOfRows();
for (size_t column_no = 0, columns = block->columns(); column_no < columns; ++column_no)
{
IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get();
const IColumn & col_from = *block->getByName(col_to.getName()).column.get();
col_to.insertRangeFrom(col_from, 0, block->rows());
}
}
first_write_time = 0;
ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds.");
if (no_destination)
return;
try
{
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
data.erase(flush_begin, flush_end);
CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
if (!first_write_time)
first_write_time = current_time;
/// We'll retry to write in a moment.
throw;
}
}
class TrivialBufferBlockOutputStream : public IBlockOutputStream
{
public:
TrivialBufferBlockOutputStream(StorageTrivialBuffer & buffer_) : buffer(buffer_) {}
void write(const Block & block) override
{
if (!block)
return;
size_t rows = block.rows();
size_t bytes = block.bytes();
if (!rows)
return;
StoragePtr destination;
if (!buffer.no_destination)
{
destination = buffer.context.tryGetTable(buffer.destination_database,
buffer.destination_table);
if (destination)
{
if (destination.get() == &buffer)
throw Exception("Destination table is myself. Write will "
"cause infinite loop.", ErrorCodes::INFINITE_LOOP);
try
{
destination->check(block, true);
}
catch (Exception & e)
{
e.addMessage("(when looking at destination table "
+ buffer.destination_database + "."
+ buffer.destination_table + ")");
throw;
}
}
}
time_t current_time = time(0);
if (buffer.checkThresholds(current_time, rows, bytes))
{
/** We'll try to flush the buffer if thresholds are overdrafted.
* It avoids unlimited memory consuming, bcause if we failed to write
* data down to the destination table, we'll throw an exception and
* the new block will not be appended to the buffer.
*/
buffer.flush(true);
}
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
buffer.addBlock/*<StorageTrivialBuffer::ZookeeperDeduplicationController>*/(block, buffer.deduplication_controller);
}
private:
StorageTrivialBuffer & buffer;
};
BlockOutputStreamPtr StorageTrivialBuffer::write(const ASTPtr & query, const Settings & settings)
{
return std::make_shared<TrivialBufferBlockOutputStream>(*this);
}
void StorageTrivialBuffer::shutdown()
{
shutdown_event.set();
if (flush_thread.joinable())
flush_thread.join();
try
{
flush(false);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/** NOTE If you do OPTIMIZE after insertion,
* it does not guarantee that all data will be in destination table at the time of
* next SELECT just after OPTIMIZE.
*
* Because in case if there was already running flush method,
* then call to flush inside OPTIMIZE will see empty buffer and return quickly,
* but at the same time, the already running flush method possibly is not finished,
* so next SELECT will observe missing data.
*
* This kind of race condition make very hard to implement proper tests.
*/
bool StorageTrivialBuffer::optimize(const String & partition, bool final, bool deduplicate, const Settings & settings)
{
if (!partition.empty())
throw Exception("Partition cannot be specified when optimizing table of type TrivialBuffer",
ErrorCodes::NOT_IMPLEMENTED);
if (final)
throw Exception("FINAL cannot be specified when optimizing table of type TrivialBuffer",
ErrorCodes::NOT_IMPLEMENTED);
if (deduplicate)
throw Exception("DEDUPLICATE cannot be specified when optimizing table of type TrivialBuffer",
ErrorCodes::NOT_IMPLEMENTED);
flush(false);
return true;
}
bool StorageTrivialBuffer::checkThresholds(
const time_t current_time, const size_t additional_rows, const size_t additional_bytes) const
{
time_t time_passed = 0;
if (first_write_time)
time_passed = current_time - first_write_time;
size_t rows = current_rows + additional_rows;
size_t bytes = current_bytes + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed);
}
bool StorageTrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedAllMinThresholds);
return true;
}
if (time_passed > max_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeMaxThreshold);
return true;
}
if (rows > max_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsMaxThreshold);
return true;
}
if (bytes > max_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesMaxThreshold);
return true;
}
return false;
}
void StorageTrivialBuffer::flushThread()
{
setThreadName("BufferFlush");
do
{
try
{
flush(true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
while (!shutdown_event.tryWait(1000));
}
void StorageTrivialBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{
if (no_destination || !block)
return;
if (!table)
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded.");
return;
}
auto insert = std::make_shared<ASTInsertQuery>();
insert->database = destination_database;
insert->table = destination_table;
/** Inserting the set columns which is the intersection of buffer columns and destination table ones.
* It will help us to support some cases with different tables' structures.
*/
Block structure_of_destination_table = table->getSampleBlock();
Names columns_intersection;
columns_intersection.reserve(block.columns());
for (size_t i : ext::range(0, structure_of_destination_table.columns()))
{
auto dst_col = structure_of_destination_table.getByPosition(i);
if (block.has(dst_col.name))
{
if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName())
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table
<< " have different type of column " << dst_col.name << ". Block of data is discarded.");
return;
}
columns_intersection.push_back(dst_col.name);
}
}
if (columns_intersection.empty())
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded.");
return;
}
if (columns_intersection.size() != block.columns())
LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
<< destination_database << "." << destination_table << ". Some columns are discarded.");
auto list_of_columns = std::make_shared<ASTExpressionList>();
insert->columns = list_of_columns;
list_of_columns->children.reserve(columns_intersection.size());
for (const String & column : columns_intersection)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(StringRange(), column, ASTIdentifier::Column));
InterpreterInsertQuery interpreter{insert, context};
auto block_io = interpreter.execute();
block_io.out->writePrefix();
block_io.out->write(block);
block_io.out->writeSuffix();
}
void StorageTrivialBuffer::alter(
const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context)
{
for (const auto & param : params)
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
throw Exception("Storage engine " + getName() + " doesn't support primary key.",
ErrorCodes::NOT_IMPLEMENTED);
auto lock = lockStructureForAlter();
/// To avoid presence of blocks of different structure in the buffer.
flush(false);
params.apply(*columns, materialized_columns, alias_columns, column_defaults);
context.getDatabase(database_name)->alterTable(
context, table_name,
*columns, materialized_columns, alias_columns, column_defaults, {});
}
}

View File

@ -0,0 +1,234 @@
#pragma once
#include <mutex>
#include <thread>
#include <Common/SipHash.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/IBlockOutputStream.h>
#include <ext/shared_ptr_helper.hpp>
#include <Poco/Event.h>
#include <Storages/IStorage.h>
#include <zkutil/ZooKeeper.h>
namespace Poco { class Logger; }
namespace DB
{
class Context;
/** Stores incoming blocks until some thresholds are exceeded, then sends
* them to the table it looks into in the same order they came to the buffer.
*
* Thresolds are checked during insert and in background thread (to control
* time thresholds).
* If inserted block exceedes max limits, buffer is flushed and then the incoming
* block is appended to buffer.
*
* Destroying TrivialBuffer or shutting down lead to the buffer flushing.
* The data in the buffer is not replicated, logged or stored. After hard reset of the
* server, the data is lost.
*/
class StorageTrivialBuffer : private ext::shared_ptr_helper<StorageTrivialBuffer>, public IStorage
{
friend class ext::shared_ptr_helper<StorageTrivialBuffer>;
friend class TrivialBufferBlockInputStream;
friend class TrivialBufferBlockOutputStream;
public:
struct Thresholds
{
time_t time; /// Seconds after insertion of first block.
size_t rows; /// Number of rows in buffer.
size_t bytes; /// Number of bytes (incompressed) in buffer.
};
static StoragePtr create(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_, size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_);
std::string getName() const override { return "TrivialBuffer"; }
std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
bool checkThresholds(const time_t current_time, const size_t additional_rows = 0,
const size_t additional_bytes = 0) const;
bool checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const;
/// Writes all the blocks in buffer into the destination table.
void shutdown() override;
bool optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) override;
void rename(const String & new_path_to_db, const String & new_database_name,
const String & new_table_name) override { name = new_table_name; }
bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override { return true; }
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool supportsParallelReplicas() const override { return true; }
/// Does not check or alter the structure of dependent table.
void alter(const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context) override;
class ZookeeperDeduplicationController
{
public:
using HashType = String;
static HashType getHashFrom(SipHash & hash) { return std::to_string(hash.get64()); }
bool contains(HashType block_hash)
{
std::string res;
return zookeeper->tryGet(path_in_zk_for_deduplication + "/" + block_hash, res);
}
void insert(HashType block_hash)
{
std::vector<String> current_hashes;
if (zookeeper->tryGetChildren(path_in_zk_for_deduplication, current_hashes) == ZNONODE)
{
throw DB::Exception("No node \'" + path_in_zk_for_deduplication + "\' to control deduplication.");
}
// Cleanup zookeeper if needed.
if (current_hashes.size() >= 2*num_blocks_to_deduplicate)
{
using HashWithTimestamp = std::pair<String, time_t>;
std::vector<HashWithTimestamp> hashes_with_timestamps;
for (auto & hash : current_hashes)
{
zkutil::Stat stat;
String res;
String path_in_zk = path_in_zk_for_deduplication + "/" + hash;
if (!zookeeper->tryGet(path_in_zk, res, &stat))
{
throw DB::Exception("Seems like a race conditions between replics was found, path: " + path_in_zk);
}
hashes_with_timestamps.emplace_back(path_in_zk, stat.ctime);
}
// We do not need to sort all the hashes, only 'num_blocks_to_deduplicate' hashes
// with minimum creation time.
auto hashes_with_timestamps_end = hashes_with_timestamps.end();
if (hashes_with_timestamps.size() > num_blocks_to_deduplicate)
hashes_with_timestamps_end = hashes_with_timestamps.begin() + num_blocks_to_deduplicate;
std::partial_sort(hashes_with_timestamps.begin(), hashes_with_timestamps_end, hashes_with_timestamps.end(),
[] (const HashWithTimestamp & a, const HashWithTimestamp & b) -> bool
{
return a.second > b.second;
}
);
zkutil::Ops nodes_to_remove;
for (auto it = hashes_with_timestamps.begin(); it != hashes_with_timestamps_end; ++it)
{
nodes_to_remove.emplace_back(std::make_unique<zkutil::Op::Remove>(it->first, -1));
}
zookeeper->tryMulti(nodes_to_remove);
}
// Finally, inserting new node.
std::string path_for_insert = path_in_zk_for_deduplication + "/" + block_hash;
if (zookeeper->tryCreate(path_for_insert, {},
zkutil::CreateMode::Persistent) != ZOK)
{
throw DB::Exception("Cannot create node at path: " + path_for_insert);
}
}
void updateOnDeduplication(HashType block_hash)
{
zookeeper->createOrUpdate(path_in_zk_for_deduplication + "/" + block_hash,
{}, zkutil::CreateMode::Persistent);
}
ZookeeperDeduplicationController(size_t num_blocks_to_deduplicate_, zkutil::ZooKeeperPtr zookeeper_,
const std::string & path_in_zk_for_deduplication_)
: num_blocks_to_deduplicate(num_blocks_to_deduplicate_),
zookeeper(zookeeper_), path_in_zk_for_deduplication(path_in_zk_for_deduplication_)
{ }
private:
using DeduplicationBuffer = std::unordered_set<HashType>;
size_t num_blocks_to_deduplicate;
zkutil::ZooKeeperPtr zookeeper;
const std::string path_in_zk_for_deduplication;
};
private:
String name;
NamesAndTypesListPtr columns;
Context & context;
std::mutex mutex;
BlocksList data;
size_t current_rows = 0;
size_t current_bytes = 0;
time_t first_write_time = 0;
const size_t num_blocks_to_deduplicate;
const String path_in_zk_for_deduplication;
zkutil::ZooKeeperPtr zookeeper;
ZookeeperDeduplicationController deduplication_controller;
const Thresholds min_thresholds;
const Thresholds max_thresholds;
const String destination_database;
const String destination_table;
/// If set, forces to clean out buffer, not write to destination table.
bool no_destination;
Poco::Logger * log;
Poco::Event shutdown_event;
/// Executes flushing by the time thresholds.
std::thread flush_thread;
StorageTrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_, size_t num_blocks_to_deduplicate_,
const String & path_in_zk_for_deduplication_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_);
template <typename DeduplicatioController>
void addBlock(const Block & block, DeduplicatioController & deduplication_controller);
/// Parameter 'table' is passed because it's sometimes pre-computed. It should
/// conform the 'destination_table'.
void writeBlockToDestination(const Block & block, StoragePtr table);
void flush(bool check_thresholds = true, bool is_called_from_background = false);
void flushThread();
};
}

View File

@ -0,0 +1,35 @@
## ClickHouse integration tests
This directory contains tests that involve several ClickHouse instances, custom configs, ZooKeeper, etc.
### Running
Prerequisites:
* [docker](https://www.docker.com/community-edition#/download). Minimum required API version: 1.25, check with `docker version`.
* [docker-compose](https://docs.docker.com/compose/). To install: `sudo pip install docker-compose`
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo pip install pytest`
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER`.
Run the tests with the `pytest` command. To select which tests to run, use: `pytest -k <test_name_pattern>`
By default tests are run with system-wide client binary, server binary and base configs. To change that,
set the following environment variables:
* `CLICKHOUSE_TESTS_SERVER_BIN_PATH` to choose the server binary.
* `CLICKHOUSE_TESTS_CLIENT_BIN_PATH` to choose the client binary.
* `CLICKHOUSE_TESTS_BASE_CONFIG_DIR` to choose the directory from which base configs (`config.xml` and
`users.xml`) are taken.
### Adding new tests
To add new test named `foo`, create a directory `test_foo` with an empty `__init__.py` and a file
named `test.py` containing tests in it. All functions with names starting with `test` will become test cases.
`helpers` directory contains utilities for:
* Launching a ClickHouse cluster with or without ZooKeeper in docker containers.
* Sending queries to launched instances.
* Introducing network failures such as severing network link between two instances.
To assert that two TSV files must be equal, wrap them in the `TSV` class and use the regular `assert`
statement. Example: `assert TSV(result) == TSV(reference)`. In case the assertion fails, `pytest`
will automagically detect the types of variables and only the small diff of two files is printed.

View File

@ -0,0 +1,5 @@
from helpers.test_tools import TSV
def pytest_assertrepr_compare(op, left, right):
if isinstance(left, TSV) and isinstance(right, TSV) and op == '==':
return ['TabSeparated values differ: '] + left.diff(right)

View File

@ -0,0 +1,44 @@
import errno
import subprocess as sp
from threading import Timer
class Client:
def __init__(self, host, port=9000, command='/usr/bin/clickhouse-client'):
self.host = host
self.port = port
self.command = [command, '--host', self.host, '--port', str(self.port)]
def query(self, sql, stdin=None, timeout=10.0):
if stdin is None:
command = self.command + ['--multiquery']
stdin = sql
else:
command = self.command + ['--query', sql]
process = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
timer = None
if timeout is not None:
def kill_process():
try:
process.kill()
except OSError as e:
if e.errno != errno.ESRCH:
raise
timer = Timer(timeout, kill_process)
timer.start()
stdout, stderr = process.communicate(stdin)
if timer is not None:
if timer.finished.is_set():
raise Exception('Client timed out!')
else:
timer.cancel()
if process.returncode != 0:
raise Exception('Client failed! return code: {}, stderr: {}'.format(process.returncode, stderr))
return stdout

View File

@ -0,0 +1,229 @@
import os
import os.path as p
import re
import subprocess
import shutil
import socket
import time
import errno
import docker
from .client import Client
HELPERS_DIR = p.dirname(__file__)
class ClickHouseCluster:
"""ClickHouse cluster with several instances and (possibly) ZooKeeper.
Add instances with several calls to add_instance(), then start them with the start() call.
Directories for instances are created in the directory of base_path. After cluster is started,
these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
"""
def __init__(self, base_path, base_configs_dir=None, server_bin_path=None, client_bin_path=None):
self.base_dir = p.dirname(base_path)
self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
self.project_name = os.getlogin() + p.basename(self.base_dir)
# docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower())
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.instances = {}
self.with_zookeeper = False
self.is_up = False
def add_instance(self, name, custom_configs, with_zookeeper=False):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
custom_configs - a list of config files that will be added to config.d/ directory
with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
"""
if self.is_up:
raise Exception('Can\'t add instance %s: cluster is already up!' % name)
if name in self.instances:
raise Exception('Can\'t add instance %s: there is already an instance with the same name!' % name)
instance = ClickHouseInstance(self.base_dir, name, custom_configs, with_zookeeper, self.base_configs_dir, self.server_bin_path)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
if with_zookeeper and not self.with_zookeeper:
self.with_zookeeper = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
return instance
def start(self, destroy_dirs=True):
if self.is_up:
return
for instance in self.instances.values():
instance.create_dir(destroy_dir=destroy_dirs)
subprocess.check_call(self.base_cmd + ['up', '-d'])
docker_client = docker.from_env()
for instance in self.instances.values():
# According to how docker-compose names containers.
instance.docker_id = self.project_name + '_' + instance.name + '_1'
container = docker_client.containers.get(instance.docker_id)
instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
instance.wait_for_start()
instance.client = Client(instance.ip_address, command=self.client_bin_path)
self.is_up = True
def shutdown(self, kill=True):
if kill:
subprocess.check_call(self.base_cmd + ['kill'])
subprocess.check_call(self.base_cmd + ['down', '--volumes'])
self.is_up = False
for instance in self.instances.values():
instance.docker_id = None
instance.ip_address = None
instance.client = None
DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
{name}:
image: ubuntu:14.04
user: '{uid}'
volumes:
- {binary_path}:/usr/bin/clickhouse:ro
- {configs_dir}:/etc/clickhouse-server/
- {db_dir}:/var/lib/clickhouse/
- {logs_dir}:/var/log/clickhouse-server/
entrypoint:
- /usr/bin/clickhouse
- --config-file=/etc/clickhouse-server/config.xml
- --log-file=/var/log/clickhouse-server/clickhouse-server.log
depends_on: {depends_on}
'''
MACROS_CONFIG_TEMPLATE = '''
<yandex>
<macros>
<instance>{name}</instance>
</macros>
</yandex>
'''
class ClickHouseInstance:
def __init__(
self, base_path, name, custom_configs, with_zookeeper,
base_configs_dir, server_bin_path):
self.name = name
self.custom_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_configs]
self.with_zookeeper = with_zookeeper
self.base_configs_dir = base_configs_dir
self.server_bin_path = server_bin_path
self.path = p.abspath(p.join(base_path, name))
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
self.docker_id = None
self.ip_address = None
self.client = None
def query(self, sql, stdin=None):
return self.client.query(sql, stdin)
def wait_for_start(self, timeout=10.0):
deadline = time.time() + timeout
while True:
if time.time() >= deadline:
raise Exception("Timed out while waiting for instance {} with ip address {} to start".format(self.name, self.ip_address))
# Repeatedly poll the instance address until there is something that listens there.
# Usually it means that ClickHouse is ready to accept queries.
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.ip_address, 9000))
return
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
time.sleep(0.1)
else:
raise
finally:
sock.close()
def create_dir(self, destroy_dir=True):
"""Create the instance directory and all the needed files there."""
if destroy_dir:
self.destroy_dir()
elif p.exists(self.path):
return
os.mkdir(self.path)
configs_dir = p.join(self.path, 'configs')
os.mkdir(configs_dir)
shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir)
shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)
config_d_dir = p.join(configs_dir, 'config.d')
os.mkdir(config_d_dir)
shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)
with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
macros_config.write(MACROS_CONFIG_TEMPLATE.format(name=self.name))
if self.with_zookeeper:
shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir)
for path in self.custom_config_paths:
shutil.copy(path, config_d_dir)
db_dir = p.join(self.path, 'database')
os.mkdir(db_dir)
logs_dir = p.join(self.path, 'logs')
os.mkdir(logs_dir)
depends_on = '[]'
if self.with_zookeeper:
depends_on = '["zoo1", "zoo2", "zoo3"]'
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
name=self.name,
uid=os.getuid(),
binary_path=self.server_bin_path,
configs_dir=configs_dir,
config_d_dir=config_d_dir,
db_dir=db_dir,
logs_dir=logs_dir,
depends_on=depends_on))
def destroy_dir(self):
if p.exists(self.path):
shutil.rmtree(self.path)

View File

@ -0,0 +1,4 @@
<yandex>
<timezone>Europe/Moscow</timezone>
<listen_host>::</listen_host>
</yandex>

View File

@ -0,0 +1,25 @@
version: '2'
services:
zoo1:
image: zookeeper
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: zookeeper
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: zookeeper
restart: always
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

View File

@ -0,0 +1,4 @@
# Helper docker container to run iptables without sudo
FROM alpine
RUN apk add -U iproute2

View File

@ -0,0 +1,159 @@
import os.path as p
import subprocess
import time
import docker
from .cluster import HELPERS_DIR
class PartitionManager:
"""Allows introducing failures in the network between docker containers.
Can act as a context manager:
with pm as PartitionManager():
pm.partition_instances(instance1, instance2)
...
# At exit all partitions are removed automatically.
"""
def __init__(self):
self._iptables_rules = []
def isolate_instance_from_zk(self, instance, action='DROP'):
self._check_instance(instance)
self._add_rule({'source': instance.ip_address, 'destination_port': 2181, 'action': action})
self._add_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action})
def partition_instances(self, left, right, action='DROP'):
self._check_instance(left)
self._check_instance(right)
self._add_rule({'source': left.ip_address, 'destination': right.ip_address, 'action': action})
self._add_rule({'source': right.ip_address, 'destination': left.ip_address, 'action': action})
def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
@staticmethod
def _check_instance(instance):
if instance.ip_address is None:
raise Exception('Instance + ' + instance.name + ' is not launched!')
def _add_rule(self, rule):
_NetworkManager.get().add_iptables_rule(**rule)
self._iptables_rules.append(rule)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.heal_all()
class _NetworkManager:
"""Execute commands inside a container with access to network settings.
We need to call iptables to create partitions, but we want to avoid sudo.
The way to circumvent this restriction is to run iptables in a container with network=host.
The container is long-running and periodically renewed - this is an optimization to avoid the overhead
of container creation on each call.
Source of the idea: https://github.com/worstcase/blockade/blob/master/blockade/host.py
"""
# Singleton instance.
_instance = None
@classmethod
def get(cls, **kwargs):
if cls._instance is None:
cls._instance = cls(**kwargs)
return cls._instance
def add_iptables_rule(self, **kwargs):
cmd = ['iptables', '-A', 'DOCKER']
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
def delete_iptables_rule(self, **kwargs):
cmd = ['iptables', '-D', 'DOCKER']
cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run(cmd, privileged=True)
@staticmethod
def _iptables_cmd_suffix(
source=None, destination=None,
source_port=None, destination_port=None,
action=None):
ret = []
if source is not None:
ret.extend(['-s', source])
if destination is not None:
ret.extend(['-d', destination])
if source_port is not None:
ret.extend(['-p', 'tcp', '--sport', str(source_port)])
if destination_port is not None:
ret.extend(['-p', 'tcp', '--dport', str(destination_port)])
if action is not None:
ret.extend(['-j', action])
return ret
def __init__(
self,
image_name='clickhouse_tests_helper',
image_path=p.join(HELPERS_DIR, 'helper_container'),
container_expire_timeout=50, container_exit_timeout=60):
self.container_expire_timeout = container_expire_timeout
self.container_exit_timeout = container_exit_timeout
self._docker_client = docker.from_env()
try:
self._image = self._docker_client.images.get(image_name)
except docker.errors.ImageNotFound:
self._image = self._docker_client.images.build(tag=image_name, path=image_path, rm=True)
self._container = None
self._ensure_container()
def _ensure_container(self):
if self._container is None or self._container_expire_time <= time.time():
if self._container is not None:
try:
self._container.remove(force=True)
except docker.errors.NotFound:
pass
# Work around https://github.com/docker/docker-py/issues/1477
host_config = self._docker_client.api.create_host_config(network_mode='host', auto_remove=True)
container_id = self._docker_client.api.create_container(
self._image.id, command=('sleep %s' % self.container_exit_timeout),
detach=True, host_config=host_config)['Id']
self._container_expire_time = time.time() + self.container_expire_timeout
self._docker_client.api.start(container_id)
self._container = self._docker_client.containers.get(container_id)
return self._container
def _exec_run(self, cmd, **kwargs):
container = self._ensure_container()
handle = self._docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self._docker_client.api.exec_start(handle).decode('utf8')
exit_code = self._docker_client.api.exec_inspect(handle)['ExitCode']
if exit_code != 0:
print output
raise subprocess.CalledProcessError(exit_code, cmd)
return output

View File

@ -0,0 +1,13 @@
import difflib
class TSV:
"""Helper to get pretty diffs between expected and actual tab-separated value files"""
def __init__(self, contents):
self.lines = contents.readlines() if isinstance(contents, file) else contents.splitlines(True)
def __eq__(self, other):
return self.lines == other.lines
def diff(self, other):
return list(line.rstrip() for line in difflib.context_diff(self.lines, other.lines))[2:]

View File

@ -0,0 +1,17 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>1000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,2 @@
[pytest]
python_files = test.py

View File

@ -0,0 +1,17 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,74 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
instance_with_dist_table = cluster.add_instance('instance_with_dist_table', ['configs/remote_servers.xml'])
replica1 = cluster.add_instance('replica1', [], with_zookeeper=True)
replica2 = cluster.add_instance('replica2', [], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for replica in (replica1, replica2):
replica.query(
"CREATE TABLE replicated (d Date, x UInt32) ENGINE = "
"ReplicatedMergeTree('/clickhouse/tables/replicated', '{instance}', d, d, 8192)")
instance_with_dist_table.query(
"CREATE TABLE distributed (d Date, x UInt32) ENGINE = "
"Distributed('test_cluster', 'default', 'replicated')")
yield cluster
finally:
cluster.shutdown()
def test(started_cluster):
with PartitionManager() as pm:
pm.partition_instances(replica1, replica2)
replica2.query("INSERT INTO replicated VALUES ('2017-05-08', 1)")
time.sleep(1) # accrue replica delay
assert replica1.query("SELECT count() FROM replicated").strip() == ''
assert replica2.query("SELECT count() FROM replicated").strip() == '1'
# With in_order balancing replica1 is chosen.
assert instance_with_dist_table.query(
"SELECT count() FROM distributed SETTINGS load_balancing='in_order'").strip() == ''
# When we set max_replica_delay, replica1 must be excluded.
assert instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '1'
pm.isolate_instance_from_zk(replica2)
time.sleep(2) # allow pings to zookeeper to timeout
# At this point all replicas are stale, but the query must still go to replica2 which is the least stale one.
assert instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '1'
# If we forbid stale replicas, the query must fail.
with pytest.raises(Exception):
instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1,
fallback_to_stale_replicas_for_distributed_queries=0
''')

View File

@ -0,0 +1,25 @@
<yandex>
<!-- retention scheme for GraphiteMergeTree engine-->
<graphite_rollup>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>^one_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
</graphite_rollup>
</yandex>

View File

@ -0,0 +1,216 @@
import os.path as p
import time
import datetime
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', ['configs/graphite_rollup.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture
def graphite_table(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
''')
yield
instance.query('DROP TABLE test.graphite')
def test_rollup_versions(graphite_table):
timestamp = int(time.time())
rounded_timestamp = timestamp - timestamp % 60
date = datetime.date.today().isoformat()
q = instance.query
# Insert rows with timestamps relative to the current time so that the first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
one_min.x1 100 {timestamp} {date} 1
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite ORDER BY updated')) == TSV(expected1)
q('OPTIMIZE TABLE test.graphite')
# After rollup only the row with max version is retained.
expected2 = '''\
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=rounded_timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite')) == TSV(expected2)
def test_rollup_aggregation(graphite_table):
q = instance.query
# This query essentially emulates what rollup does.
result1 = q('''
SELECT avg(v), max(upd)
FROM (SELECT timestamp,
argMax(value, (updated, number)) AS v,
max(updated) AS upd
FROM (SELECT 'one_min.x5' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(intDiv(number, 2)) AS updated,
number
FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200
GROUP BY timestamp)
''')
expected1 = '''\
999634.9918367347 499999
'''
assert TSV(result1) == TSV(expected1)
# Timestamp 1111111111 is in sufficiently distant past so that the last retention clause is active.
result2 = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated
FROM (SELECT * FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200;
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected2 = '''\
one_min.x 999634.9918367347 1111444200 2017-02-02 499999
'''
assert TSV(result2) == TSV(expected2)
def test_rollup_aggregation_2(graphite_table):
result = instance.query('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 - intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected = '''\
one_min.x 24 1111110600 2017-02-02 100
'''
assert TSV(result) == TSV(expected)
def test_multiple_paths_and_versions(graphite_table):
result = instance.query('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
INSERT INTO test.graphite
SELECT 'one_min.y' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + number * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
with open(p.join(p.dirname(__file__), 'test_multiple_paths_and_versions.reference')) as reference:
assert TSV(result) == TSV(reference)
def test_multiple_output_blocks(graphite_table):
MERGED_BLOCK_SIZE = 8192
to_insert = ''
expected = ''
for i in range(2 * MERGED_BLOCK_SIZE + 1):
rolled_up_time = 1000000200 + 600 * i
for j in range(3):
cur_time = rolled_up_time + 100 * j
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(10 * j, cur_time)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(10 * (j + 1), cur_time)
expected += 'one_min.x1 20 {} 2001-09-09 2\n'.format(rolled_up_time)
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
result = instance.query('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)
def test_paths_not_matching_any_pattern(graphite_table):
to_insert = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 999999600 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = instance.query('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)

View File

@ -1,128 +0,0 @@
<yandex>
<!-- retention scheme for GraphiteMergeTree engine-->
<graphite_rollup>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>^one_sec</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>1</precision>
</retention>
<retention>
<age>86400</age>
<precision>5</precision>
</retention>
<retention>
<age>604800</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^five_sec</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>604800</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^five_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^ten_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<regexp>^half_hour</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>1800</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_hour</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_day</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>86400</precision>
</retention>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>2592000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>

View File

@ -1,3 +0,0 @@
one_min.x1 100 1486048740 2017-02-02 1
one_min.x1 200 1486048740 2017-02-02 2
one_min.x1 200 1486048740 2017-02-02 2

View File

@ -1,13 +0,0 @@
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT into test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, toUInt32(toDateTime('2017-02-02 18:19:00')), toDate('2017-02-02'), 1);
INSERT into test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, toUInt32(toDateTime('2017-02-02 18:19:00')), toDate('2017-02-02'), 2);
SELECT * FROM test.graphite ORDER BY updated;
OPTIMIZE TABLE test.graphite;
SELECT * FROM test.graphite ORDER BY updated;
DROP TABLE test.graphite;

View File

@ -1,2 +0,0 @@
one_min.x 999636.4856809663 1111444200 2017-02-02 499999
999634.9918367347 499999

View File

@ -1,10 +0,0 @@
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated FROM (SELECT * FROM system.numbers LIMIT 1000000) WHERE intDiv(timestamp, 600) * 600 = 1111444200;
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
SELECT avg(v), max(upd) FROM (SELECT timestamp, argMax(value, (updated, number)) AS v, max(updated) AS upd FROM (SELECT 'one_min.x5' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated, number FROM system.numbers LIMIT 1000000) WHERE intDiv(timestamp, 600) * 600 = 1111444200 GROUP BY timestamp);
DROP TABLE test.graphite;

View File

@ -1 +0,0 @@
one_min.x 24 1111110600 2017-02-02 100

View File

@ -1,8 +0,0 @@
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 - intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
DROP TABLE test.graphite;

View File

@ -1,12 +0,0 @@
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
INSERT INTO test.graphite SELECT 'one_min.y' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + number * 600) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
DROP TABLE test.graphite;

View File

@ -12,3 +12,5 @@
3 Goodbye 3 Goodbye
1 Hello 1 Hello
3 Goodbye 3 Goodbye
1 TSKV
4 TSKV Ok

View File

@ -17,6 +17,8 @@ echo -ne '1\tHello\n2\n3\tGoodbye\n\n' | clickhouse-client --input_format_allow_
echo -ne '1\tHello\n2\n3\tGoodbye\n\n' | clickhouse-client --input_format_allow_errors_num=1 --input_format_allow_errors_ratio=0.6 --query="INSERT INTO test.formats_test FORMAT TSV" echo -ne '1\tHello\n2\n3\tGoodbye\n\n' | clickhouse-client --input_format_allow_errors_num=1 --input_format_allow_errors_ratio=0.6 --query="INSERT INTO test.formats_test FORMAT TSV"
echo -ne 'x=1\ts=TSKV\nx=minus2\ts=trash1\ns=trash2\tx=-3\ns=TSKV Ok\tx=4\ns=trash3\tx=-5\n' | clickhouse-client --input_format_allow_errors_num=3 -q "INSERT INTO test.formats_test FORMAT TSKV"
clickhouse-client --query="SELECT * FROM test.formats_test" clickhouse-client --query="SELECT * FROM test.formats_test"
clickhouse-client --query="DROP TABLE test.formats_test" clickhouse-client --query="DROP TABLE test.formats_test"

View File

@ -0,0 +1,298 @@
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Row 5:
──────
k: 4
count(): 20
Totals:
───────
k: 0
count(): 100
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Row 5:
──────
k: 4
count(): 20
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Row 5:
──────
k: 4
count(): 20
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Showed first 4.
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Showed first 4.
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Showed first 4.
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Row 5:
──────
k: 4
count(): 20
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20
Row 1:
──────
k: 0
count(): 20
Row 2:
──────
k: 1
count(): 20
Row 3:
──────
k: 2
count(): 20
Row 4:
──────
k: 3
count(): 20
Showed first 4.
Totals:
───────
k: 0
count(): 100
Min:
────
k: 0
count(): 20
Max:
────
k: 4
count(): 20

View File

@ -0,0 +1,22 @@
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical;
SET extremes = 1;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical;
SET output_format_pretty_max_rows = 5;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical;
SET output_format_pretty_max_rows = 4;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw;
SET extremes = 1;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw;
SET output_format_pretty_max_rows = 5;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw;
SET output_format_pretty_max_rows = 4;
SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw;

View File

@ -0,0 +1 @@
[] [] (0,'','0000-00-00 00:00:00','0000-00-00')

View File

@ -0,0 +1 @@
SELECT defaultValueOfArgumentType([1, 2, 3]), defaultValueOfArgumentType([[[1]]]), defaultValueOfArgumentType((1, 'Hello', now(), today()));

View File

@ -0,0 +1,4 @@
0 0
1 1
0 false
1 true

View File

@ -0,0 +1,6 @@
DROP TABLE IF EXISTS test.json;
CREATE TABLE test.json (x UInt8, title String) ENGINE = Memory;
INSERT INTO test.json FORMAT JSONEachRow {"x": true, "title": "true"}, {"x": false, "title": "false"}, {"x": 0, "title": "0"}, {"x": 1, "title": "1"}
SELECT * FROM test.json ORDER BY title;
DROP TABLE IF EXISTS test.json;

View File

@ -2,6 +2,7 @@
========== ==========
Описанные в разделе настройки могут быть заданы следующими способами: Описанные в разделе настройки могут быть заданы следующими способами:
* Глобально. * Глобально.
В конфигурационных файлах сервера. В конфигурационных файлах сервера.

View File

@ -42,6 +42,35 @@ fallback_to_stale_replicas_for_distributed_queries
По умолчанию - 1 (включена). По умолчанию - 1 (включена).
input_format_allow_errors_num
-----------------------------
Устанавливает максимальное количество допустимых ошибок при чтении из текстовых форматов (CSV, TSV и т.п.).
Значение по умолчанию - 0.
Используйте обязательно в паре с ``input_format_allow_errors_ratio``. Для пропуска ошибок, значения обеих настроек должны быть больше 0.
Если при чтении строки возникла ошибка, но при этом счетчик ошибок меньше ``input_format_allow_errors_num``, то ClickHouse игнорирует строку и переходит к следующей.
В случае превышения ``input_format_allow_errors_num`` ClickHouse генерирует исключение.
input_format_allow_errors_ratio
-------------------------------
Устанавливает максимальную долю допустимых ошибок при чтении из текстовых форматов (CSV, TSV и т.п.).
Доля ошибок задаётся в виде числа с плавающей запятой от 0 до 1.
Значение по умолчанию - 0.
Используйте обязательно в паре с ``input_format_allow_errors_num``. Для пропуска ошибок, значения обеих настроек должны быть больше 0.
Если при чтении строки возникла ошибка, но при этом текущая доля ошибок меньше ``input_format_allow_errors_ratio``, то ClickHouse игнорирует строку и переходит к следующей.
В случае превышения ``input_format_allow_errors_ratio`` ClickHouse генерирует исключение.
max_block_size max_block_size
-------------- --------------
Данные в ClickHouse обрабатываются по блокам (наборам кусочков столбцов). Внутренние циклы обработки одного блока достаточно эффективны, но при этом существуют заметные издержки на каждый блок. ``max_block_size`` - это рекомендация, какого размера блоки (в количестве строк) загружать из таблицы. Размер блока должен быть не слишком маленьким, чтобы издержки на каждый блок оставались незаметными, и не слишком большим, чтобы запрос с LIMIT-ом, который завершается уже после первого блока, выполнялся быстро; чтобы не использовалось слишком много оперативки при вынимании большого количества столбцов в несколько потоков; чтобы оставалась хоть какая-нибудь кэш-локальность. Данные в ClickHouse обрабатываются по блокам (наборам кусочков столбцов). Внутренние циклы обработки одного блока достаточно эффективны, но при этом существуют заметные издержки на каждый блок. ``max_block_size`` - это рекомендация, какого размера блоки (в количестве строк) загружать из таблицы. Размер блока должен быть не слишком маленьким, чтобы издержки на каждый блок оставались незаметными, и не слишком большим, чтобы запрос с LIMIT-ом, который завершается уже после первого блока, выполнялся быстро; чтобы не использовалось слишком много оперативки при вынимании большого количества столбцов в несколько потоков; чтобы оставалась хоть какая-нибудь кэш-локальность.