diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 0a982249f87..8e17dcc5ca1 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -606,5 +606,13 @@ void Block::unshareColumns() } } +void Block::updateHash(SipHash & hash) const +{ + for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) + { + for (auto & col : getColumns()) + col.column->updateHashWithValue(row_no, hash); + } +} } diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index 93c8279c400..8228d67c1c8 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -119,6 +119,12 @@ public: */ void unshareColumns(); + /** Updates SipHash of the Block, using update method of columns. + * Returns hash for block, that could be used to differentiate blocks + * with same structure, but different data. + */ + void updateHash(SipHash & hash) const; + private: void eraseImpl(size_t position); void initializeIndexByName(); diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index 2d4d8a26617..6a664735262 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -177,7 +177,7 @@ namespace ErrorCodes extern const int TOO_BIG_AST = 168; extern const int BAD_TYPE_OF_FIELD = 169; extern const int BAD_GET = 170; - extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE = 171; + extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE = 171; extern const int CANNOT_CREATE_DIRECTORY = 172; extern const int CANNOT_ALLOCATE_MEMORY = 173; extern const int CYCLIC_ALIASES = 174; diff --git a/dbms/src/DataStreams/BlockInputStreamFromRowInputStream.cpp b/dbms/src/DataStreams/BlockInputStreamFromRowInputStream.cpp index 6dfa9992e62..852ba3773da 100644 --- a/dbms/src/DataStreams/BlockInputStreamFromRowInputStream.cpp +++ b/dbms/src/DataStreams/BlockInputStreamFromRowInputStream.cpp @@ -12,6 +12,7 @@ namespace ErrorCodes extern const int CANNOT_PARSE_DATE; extern const int CANNOT_PARSE_DATETIME; extern const int CANNOT_READ_ARRAY_FROM_TEXT; + extern const int CANNOT_PARSE_NUMBER; } @@ -33,7 +34,8 @@ static bool isParseError(int code) || code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING || code == ErrorCodes::CANNOT_PARSE_DATE || code == ErrorCodes::CANNOT_PARSE_DATETIME - || code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT; + || code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT + || code == ErrorCodes::CANNOT_PARSE_NUMBER; } diff --git a/dbms/src/DataStreams/CastTypeBlockInputStream.cpp b/dbms/src/DataStreams/CastTypeBlockInputStream.cpp index 975841bbb5f..ced3fed41b6 100644 --- a/dbms/src/DataStreams/CastTypeBlockInputStream.cpp +++ b/dbms/src/DataStreams/CastTypeBlockInputStream.cpp @@ -55,17 +55,17 @@ Block CastTypeBlockInputStream::readImpl() if (it == cast_description.end()) { + // Leave the same column res.insert(src_column); } else { CastElement & cast_element = it->second; - size_t tmp_col = cast_element.tmp_col_offset; - ColumnNumbers arguments{tmp_col, tmp_col + 1}; - tmp_conversion_block.getByPosition(tmp_col).column = src_column.column; - cast_element.function->execute(tmp_conversion_block, arguments, tmp_col + 2); + tmp_conversion_block.getByPosition(tmp_col).column = src_column.column; + cast_element.function->execute(tmp_conversion_block, ColumnNumbers{tmp_col, tmp_col + 1}, tmp_col + 2); + res.insert(tmp_conversion_block.getByPosition(tmp_col + 2)); } } @@ -93,22 +93,24 @@ void CastTypeBlockInputStream::initialize(const Block & src_block) /// Force conversion if source and destination types is different. if (!ref_column.type->equals(*src_column.type)) { - ColumnWithTypeAndName src_columnn_copy = src_column.cloneEmpty(); - ColumnWithTypeAndName alias_column(std::make_shared(1, ref_column.type->getName()), std::make_shared(), ""); - ColumnWithTypeAndName result_column(nullptr, ref_column.type->clone(), src_column.name); + ColumnWithTypeAndName res_type_name_column(std::make_shared(1, ref_column.type->getName()), std::make_shared(), ""); + ColumnWithTypeAndName res_blank_column(nullptr, ref_column.type->clone(), src_column.name); - DataTypePtr unused_return_type; - std::vector unused_prerequisites; - ColumnsWithTypeAndName arguments{src_columnn_copy, alias_column}; - - /// Prepares function to execution. TODO It is not obvious. + /// Prepares function to execution auto cast_function = FunctionFactory::instance().get("CAST", context); - cast_function->getReturnTypeAndPrerequisites(arguments, unused_return_type, unused_prerequisites); + { + DataTypePtr unused_return_type; + std::vector unused_prerequisites; + ColumnsWithTypeAndName arguments{src_column, res_type_name_column}; + cast_function->getReturnTypeAndPrerequisites(arguments, unused_return_type, unused_prerequisites); + } + /// Prefill arguments and result column for current CAST tmp_conversion_block.insert(src_column); - tmp_conversion_block.insert(alias_column); - tmp_conversion_block.insert(result_column); + tmp_conversion_block.insert(res_type_name_column); + tmp_conversion_block.insert(res_blank_column); + /// Index of src_column blank in tmp_conversion_block size_t tmp_col_offset = cast_description.size() * 3; cast_description.emplace(src_col, CastElement(std::move(cast_function), tmp_col_offset)); } diff --git a/dbms/src/DataStreams/CastTypeBlockInputStream.h b/dbms/src/DataStreams/CastTypeBlockInputStream.h index b100c4bee35..91418b61c74 100644 --- a/dbms/src/DataStreams/CastTypeBlockInputStream.h +++ b/dbms/src/DataStreams/CastTypeBlockInputStream.h @@ -27,20 +27,26 @@ private: const Context & context; Block ref_defenition; + /// Initializes cast_description and prepares tmp_conversion_block void initialize(const Block & src_block); bool initialized = false; struct CastElement { + /// Prepared function to do conversion std::shared_ptr function; + /// Position of first function argument in tmp_conversion_block size_t tmp_col_offset; CastElement(std::shared_ptr && function_, size_t tmp_col_offset_); }; /// Describes required conversions on source block + /// Contains column numbers in source block that should be converted std::map cast_description; - /// Auxiliary block, stores arguments and results of required CAST calls + + /// Auxiliary block, stores prefilled arguments and result for each CAST function in cast_description + /// 3 columns are allocated for each conversion: [blank of source column, column with res type name, blank of res column] Block tmp_conversion_block; }; diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index dabbbecaa54..d67a718ef7d 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -155,9 +155,11 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf else if (name == "PrettySpaceNoEscapes") return std::make_shared(buf, true, settings.output_format_pretty_max_rows, context); else if (name == "Vertical") - return std::make_shared(std::make_shared(buf, sample, context)); + return std::make_shared(std::make_shared( + buf, sample, settings.output_format_pretty_max_rows, context)); else if (name == "VerticalRaw") - return std::make_shared(std::make_shared(buf, sample, context)); + return std::make_shared(std::make_shared( + buf, sample, settings.output_format_pretty_max_rows, context)); else if (name == "Values") return std::make_shared(std::make_shared(buf)); else if (name == "JSON") diff --git a/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp b/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp index 2181e0095ca..b4d602c7598 100644 --- a/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp +++ b/dbms/src/DataStreams/JSONEachRowRowInputStream.cpp @@ -124,7 +124,7 @@ bool JSONEachRowRowInputStream::read(Block & block) } skipWhitespaceIfAny(istr); - if (!istr.eof() && *istr.position() == ',') + if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';')) /// Semicolon is added for convenience as it could be used at end of INSERT query. ++istr.position(); /// Fill non-visited columns with the default values. diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 522ab6ac676..e1c80629f55 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -11,7 +11,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; - extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; + extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; } @@ -130,7 +130,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs & { throw Exception("Merging blocks has different names or types of columns:\n" + shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(), - ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); + ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); } } } diff --git a/dbms/src/DataStreams/TSKVRowInputStream.cpp b/dbms/src/DataStreams/TSKVRowInputStream.cpp index cf0d639df00..9216cddc8b4 100644 --- a/dbms/src/DataStreams/TSKVRowInputStream.cpp +++ b/dbms/src/DataStreams/TSKVRowInputStream.cpp @@ -10,6 +10,7 @@ namespace ErrorCodes extern const int INCORRECT_DATA; extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; extern const int CANNOT_READ_ALL_DATA; + extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED; } @@ -108,6 +109,7 @@ bool TSKVRowInputStream::read(Block & block) { StringRef name_ref; bool has_value = readName(istr, name_ref, name_buf); + ssize_t index = -1; if (has_value) { @@ -126,7 +128,7 @@ bool TSKVRowInputStream::read(Block & block) } else { - size_t index = it->second; + index = it->second; if (read_columns[index]) throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); @@ -159,7 +161,16 @@ bool TSKVRowInputStream::read(Block & block) break; } else - throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA); + { + /// Possibly a garbage was written into column, remove it + if (index >= 0) + { + block.getByPosition(index).column->popBack(1); + read_columns[index] = false; + } + + throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); + } } } diff --git a/dbms/src/DataStreams/VerticalRowOutputStream.cpp b/dbms/src/DataStreams/VerticalRowOutputStream.cpp index 9e2bec95336..68fa681a65e 100644 --- a/dbms/src/DataStreams/VerticalRowOutputStream.cpp +++ b/dbms/src/DataStreams/VerticalRowOutputStream.cpp @@ -10,8 +10,9 @@ namespace DB { -VerticalRowOutputStream::VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const Context & context) - : ostr(ostr_), sample(sample_) +VerticalRowOutputStream::VerticalRowOutputStream( + WriteBuffer & ostr_, const Block & sample_, size_t max_rows_, const Context & context) + : ostr(ostr_), sample(sample_), max_rows(max_rows_) { size_t columns = sample.columns(); @@ -60,6 +61,9 @@ void VerticalRowOutputStream::flush() void VerticalRowOutputStream::writeField(const IColumn & column, const IDataType & type, size_t row_num) { + if (row_number > max_rows) + return; + writeString(names_and_paddings[field_number], ostr); writeValue(column, type, row_num); writeChar('\n', ostr); @@ -82,6 +86,10 @@ void VerticalRawRowOutputStream::writeValue(const IColumn & column, const IDataT void VerticalRowOutputStream::writeRowStartDelimiter() { ++row_number; + + if (row_number > max_rows) + return; + writeCString("Row ", ostr); writeIntText(row_number, ostr); writeCString(":\n", ostr); @@ -95,9 +103,77 @@ void VerticalRowOutputStream::writeRowStartDelimiter() void VerticalRowOutputStream::writeRowBetweenDelimiter() { + if (row_number > max_rows) + return; + writeCString("\n", ostr); field_number = 0; } +void VerticalRowOutputStream::writeSuffix() +{ + if (row_number > max_rows) + { + writeCString("Showed first ", ostr); + writeIntText(max_rows, ostr); + writeCString(".\n", ostr); + } + + if (totals || extremes) + { + writeCString("\n", ostr); + writeTotals(); + writeExtremes(); + } +} + + +void VerticalRowOutputStream::writeSpecialRow(const Block & block, size_t row_num, const char * title) +{ + writeCString("\n", ostr); + + row_number = 0; + field_number = 0; + + size_t columns = block.columns(); + + writeCString(title, ostr); + writeCString(":\n", ostr); + + size_t width = strlen(title) + 1; + for (size_t i = 0; i < width; ++i) + writeCString("─", ostr); + writeChar('\n', ostr); + + for (size_t i = 0; i < columns; ++i) + { + if (i != 0) + writeFieldDelimiter(); + + auto & col = block.getByPosition(i); + writeField(*col.column.get(), *col.type.get(), row_num); + } +} + + +void VerticalRowOutputStream::writeTotals() +{ + if (totals) + { + writeSpecialRow(totals, 0, "Totals"); + } +} + + +void VerticalRowOutputStream::writeExtremes() +{ + if (extremes) + { + writeSpecialRow(extremes, 0, "Min"); + writeSpecialRow(extremes, 1, "Max"); + } +} + + } diff --git a/dbms/src/DataStreams/VerticalRowOutputStream.h b/dbms/src/DataStreams/VerticalRowOutputStream.h index 98e440a6b15..f3a4444734a 100644 --- a/dbms/src/DataStreams/VerticalRowOutputStream.h +++ b/dbms/src/DataStreams/VerticalRowOutputStream.h @@ -18,24 +18,37 @@ class Context; class VerticalRowOutputStream : public IRowOutputStream { public: - VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const Context & context); + VerticalRowOutputStream(WriteBuffer & ostr_, const Block & sample_, size_t max_rows_, const Context & context); void writeField(const IColumn & column, const IDataType & type, size_t row_num) override; void writeRowStartDelimiter() override; void writeRowBetweenDelimiter() override; + void writeSuffix() override; void flush() override; + void setTotals(const Block & totals_) override { totals = totals_; } + void setExtremes(const Block & extremes_) override { extremes = extremes_; } + protected: virtual void writeValue(const IColumn & column, const IDataType & type, size_t row_num) const; + void writeTotals(); + void writeExtremes(); + /// For totals and extremes. + void writeSpecialRow(const Block & block, size_t row_num, const char * title); + WriteBuffer & ostr; const Block sample; + size_t max_rows; size_t field_number = 0; size_t row_number = 0; using NamesAndPaddings = std::vector; NamesAndPaddings names_and_paddings; + + Block totals; + Block extremes; }; @@ -44,8 +57,7 @@ protected: class VerticalRawRowOutputStream final : public VerticalRowOutputStream { public: - VerticalRawRowOutputStream(WriteBuffer & ostr_, const Block & sample_, const Context & context) - : VerticalRowOutputStream(ostr_, sample_, context) {} + using VerticalRowOutputStream::VerticalRowOutputStream; protected: void writeValue(const IColumn & column, const IDataType & type, size_t row_num) const override; diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h index 657e5956141..ac93f3716f5 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h @@ -31,24 +31,13 @@ public: bool operator<(const Part & rhs) const { - if (month != rhs.month) - return month < rhs.month; - - if (left != rhs.left) - return left < rhs.left; - if (right != rhs.right) - return right < rhs.right; - - if (level != rhs.level) - return level < rhs.level; - - return false; + return std::tie(month, left, right, level) < std::tie(rhs.month, rhs.left, rhs.right, rhs.level); } - /// Contains another part (obtained after combining another part with some other) + /// Contains another part (obtained after merging another part with some other) bool contains(const Part & rhs) const { - return month == rhs.month /// Parts for different months are not combined + return month == rhs.month /// Parts for different months are not merged && left_date <= rhs.left_date && right_date >= rhs.right_date && left <= rhs.left diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 454c646e6ac..154511c511b 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -39,7 +39,7 @@ namespace DB namespace ErrorCodes { extern const int INFINITE_LOOP; - extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; + extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; } @@ -198,7 +198,7 @@ static void appendBlock(const Block & from, Block & to) if (col_from.getName() != col_to.getName()) throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no) - + ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE); + + ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); col_to.insertRangeFrom(col_from, 0, rows); } diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index a9223385c78..48cfe7dc39a 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -556,6 +557,54 @@ StoragePtr StorageFactory::get( num_buckets, {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes}, destination_database, destination_table); } + else if (name == "TrivialBuffer") + { + /** TrivialBuffer(db, table, num_blocks_to_deduplicate, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes, path_in_zookeeper) + * + * db, table - in which table to put data from buffer. + * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for pushing out from the buffer. + * num_blocks_to_deduplicate - level of parallelism. + */ + + const std::string error_message_argument_number_mismatch = "Storage TrivialBuffer requires 10 parameters: " + " destination database, destination table, num_blocks_to_deduplicate, min_time, max_time, min_rows," + " max_rows, min_bytes, max_bytes, path_in_zookeeper."; + ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; + + if (args_func.size() != 1) + throw Exception(error_message_argument_number_mismatch, + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + ASTs & args = typeid_cast(*args_func.at(0)).children; + + if (args.size() != 10) + throw Exception(error_message_argument_number_mismatch, + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + args[0] = evaluateConstantExpressionOrIdentidierAsLiteral(args[0], local_context); + args[1] = evaluateConstantExpressionOrIdentidierAsLiteral(args[1], local_context); + + String destination_database = static_cast(*args[0]).value.safeGet(); + String destination_table = static_cast(*args[1]).value.safeGet(); + + size_t num_blocks_to_deduplicate = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[2]).value); + + time_t min_time = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[3]).value); + time_t max_time = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[4]).value); + size_t min_rows = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[5]).value); + size_t max_rows = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[6]).value); + size_t min_bytes = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[7]).value); + size_t max_bytes = applyVisitor(FieldVisitorConvertToNumber(), typeid_cast(*args[8]).value); + + String path_in_zk_for_deduplication = static_cast(*args[9]).value.safeGet(); + + return StorageTrivialBuffer::create( + table_name, columns, + materialized_columns, alias_columns, column_defaults, + context, num_blocks_to_deduplicate, path_in_zk_for_deduplication, + {min_time, min_rows, min_bytes}, {max_time, max_rows, max_bytes}, + destination_database, destination_table); + } else if (endsWith(name, "MergeTree")) { /** [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 combinations) engines diff --git a/dbms/src/Storages/StorageTrivialBuffer.cpp b/dbms/src/Storages/StorageTrivialBuffer.cpp new file mode 100644 index 00000000000..4fc06cdc6df --- /dev/null +++ b/dbms/src/Storages/StorageTrivialBuffer.cpp @@ -0,0 +1,561 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +namespace ProfileEvents +{ + extern const Event StorageBufferFlush; + extern const Event StorageBufferErrorOnFlush; + extern const Event StorageBufferPassedAllMinThresholds; + extern const Event StorageBufferPassedTimeMaxThreshold; + extern const Event StorageBufferPassedRowsMaxThreshold; + extern const Event StorageBufferPassedBytesMaxThreshold; +} + +namespace CurrentMetrics +{ + extern const Metric StorageBufferRows; + extern const Metric StorageBufferBytes; +} + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INFINITE_LOOP; + extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE; +} + + +StoragePtr StorageTrivialBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, const size_t num_blocks_to_deduplicate_, + const String & path_in_zk_for_deduplication_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_) +{ + return make_shared( + name_, columns_, materialized_columns_, alias_columns_, column_defaults_, + context_, num_blocks_to_deduplicate_, path_in_zk_for_deduplication_, + min_thresholds_, max_thresholds_, + destination_database_, destination_table_); +} + + +StorageTrivialBuffer::StorageTrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, const size_t num_blocks_to_deduplicate_, + const String & path_in_zk_for_deduplication_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_) + : IStorage{materialized_columns_, alias_columns_, column_defaults_}, + name(name_), columns(columns_), context(context_), + num_blocks_to_deduplicate(num_blocks_to_deduplicate_), + path_in_zk_for_deduplication(path_in_zk_for_deduplication_), + zookeeper(context.getZooKeeper()), + deduplication_controller(num_blocks_to_deduplicate, zookeeper, path_in_zk_for_deduplication), + min_thresholds(min_thresholds_), max_thresholds(max_thresholds_), + destination_database(destination_database_), destination_table(destination_table_), + no_destination(destination_database.empty() && destination_table.empty()), + log(&Logger::get("TrivialBuffer (" + name + ")")), + flush_thread(&StorageTrivialBuffer::flushThread, this) +{ + zookeeper->createAncestors(path_in_zk_for_deduplication); + zookeeper->createOrUpdate(path_in_zk_for_deduplication, {}, zkutil::CreateMode::Persistent); +} + +class TrivialBufferBlockInputStream : public IProfilingBlockInputStream +{ +public: + TrivialBufferBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, + BlocksList::iterator end_, StorageTrivialBuffer & buffer_) + : column_names(column_names_), buffer(buffer_), + begin(begin_), end(end_), it(begin_) {} + + String getName() const { return "TrivialStorageBuffer"; } + + String getID() const + { + std::stringstream res; + res << "TrivialStorageBuffer(" << &buffer; + + for (const auto & name : column_names) + res << ", " << name; + + res << ")"; + return res.str(); + } + +protected: + Block readImpl() + { + Block res; + + if (it == end) + return res; + + for (const auto & column : column_names) + res.insert(it->getByName(column)); + ++it; + + return res; + } + +private: + Names column_names; + StorageTrivialBuffer & buffer; + BlocksList::iterator begin, end, it; +}; + +BlockInputStreams StorageTrivialBuffer::read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned threads) +{ + check(column_names); + processed_stage = QueryProcessingStage::FetchColumns; + + BlockInputStreams streams; + + if (!no_destination) + { + auto destination = context.getTable(destination_database, destination_table); + + if (destination.get() == this) + throw Exception("Destination table is myself. Read will cause infinite loop.", + ErrorCodes::INFINITE_LOOP); + + /** TrivialStorageBuffer does not support 'PREWHERE', + * so turn off corresponding optimization. + */ + Settings modified_settings = settings; + modified_settings.optimize_move_to_prewhere = false; + + streams = destination->read(column_names, query, context, modified_settings, + processed_stage, max_block_size, threads); + } + + BlockInputStreams streams_from_buffers; + std::lock_guard lock(mutex); + size_t size = data.size(); + if (threads > size) + threads = size; + + for (size_t thread = 0; thread < threads; ++thread) + { + BlocksList::iterator begin = data.begin(); + BlocksList::iterator end = data.begin(); + + std::advance(begin, thread * size / threads); + std::advance(end, (thread + 1) * size / threads); + + streams_from_buffers.push_back(std::make_shared(column_names, begin, end, *this)); + } + + /** If sources from destination table are already processed to non-starting stage, then we should wrap + * sources from the buffer to the same stage of processing conveyor. + */ + if (processed_stage > QueryProcessingStage::FetchColumns) + for (auto & stream : streams_from_buffers) + stream = InterpreterSelectQuery(query, context, processed_stage, 0, stream).execute().in; + + streams.insert(streams.end(), streams_from_buffers.begin(), streams_from_buffers.end()); + return streams; +} + +template +void StorageTrivialBuffer::addBlock(const Block & block, DeduplicatioController & deduplication_controller) +{ + SipHash hash; + block.updateHash(hash); + typename DeduplicatioController::HashType block_hash = DeduplicatioController::getHashFrom(hash); + + std::lock_guard lock(mutex); + if (!deduplication_controller.contains(block_hash)) + { + deduplication_controller.insert(block_hash); + current_rows += block.rows(); + current_bytes += block.bytes(); + data.push_back(block); + + CurrentMetrics::add(CurrentMetrics::StorageBufferRows, current_rows); + CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, current_bytes); + } + else + { + deduplication_controller.updateOnDeduplication(block_hash); + } +} + +void StorageTrivialBuffer::flush(bool check_thresholds, bool is_called_from_background) +{ + Block block_to_write; + time_t current_time = time(0); + + time_t time_passed = 0; + + if (data.empty()) + return; + + BlocksList::iterator flush_begin, flush_end; + { + std::unique_lock lock(mutex, std::try_to_lock_t()); + + if (!lock.owns_lock()) + { + // NOTE: is this the behavior we expect from 'flush' concurrency? + if (!is_called_from_background) + LOG_ERROR(log, "Method \'StorageTrivialBuffer::flush\' was called simultaneously from different threads"); + return; + } + + if (first_write_time) + time_passed = current_time - first_write_time; + + if (check_thresholds) + { + if (!checkThresholdsImpl(current_rows, current_bytes, time_passed)) + return; + } + else + { + if (current_rows == 0) + return; + } + + flush_begin = data.begin(); + flush_end = std::prev(data.end()); + block_to_write = flush_begin->cloneEmpty(); + } + + /// Collecting BlockList into single block. + block_to_write.checkNumberOfRows(); + flush_end = std::next(flush_end); + for (auto block = flush_begin; block != flush_end; ++block) + { + block->checkNumberOfRows(); + for (size_t column_no = 0, columns = block->columns(); column_no < columns; ++column_no) + { + IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get(); + const IColumn & col_from = *block->getByName(col_to.getName()).column.get(); + + col_to.insertRangeFrom(col_from, 0, block->rows()); + } + + } + first_write_time = 0; + + ProfileEvents::increment(ProfileEvents::StorageBufferFlush); + + LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds."); + + if (no_destination) + return; + + try + { + writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table)); + data.erase(flush_begin, flush_end); + + CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows()); + CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes()); + + } + catch (...) + { + ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush); + + if (!first_write_time) + first_write_time = current_time; + + /// We'll retry to write in a moment. + throw; + } + +} + +class TrivialBufferBlockOutputStream : public IBlockOutputStream +{ +public: + TrivialBufferBlockOutputStream(StorageTrivialBuffer & buffer_) : buffer(buffer_) {} + void write(const Block & block) override + { + if (!block) + return; + + size_t rows = block.rows(); + size_t bytes = block.bytes(); + if (!rows) + return; + + StoragePtr destination; + if (!buffer.no_destination) + { + destination = buffer.context.tryGetTable(buffer.destination_database, + buffer.destination_table); + + if (destination) + { + if (destination.get() == &buffer) + throw Exception("Destination table is myself. Write will " + "cause infinite loop.", ErrorCodes::INFINITE_LOOP); + + try + { + destination->check(block, true); + } + catch (Exception & e) + { + e.addMessage("(when looking at destination table " + + buffer.destination_database + "." + + buffer.destination_table + ")"); + throw; + } + } + } + + time_t current_time = time(0); + if (buffer.checkThresholds(current_time, rows, bytes)) + { + /** We'll try to flush the buffer if thresholds are overdrafted. + * It avoids unlimited memory consuming, bcause if we failed to write + * data down to the destination table, we'll throw an exception and + * the new block will not be appended to the buffer. + */ + + buffer.flush(true); + } + + if (!buffer.first_write_time) + buffer.first_write_time = current_time; + + buffer.addBlock/**/(block, buffer.deduplication_controller); + } +private: + StorageTrivialBuffer & buffer; +}; + +BlockOutputStreamPtr StorageTrivialBuffer::write(const ASTPtr & query, const Settings & settings) +{ + return std::make_shared(*this); +} + +void StorageTrivialBuffer::shutdown() +{ + shutdown_event.set(); + + if (flush_thread.joinable()) + flush_thread.join(); + + try + { + flush(false); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + +} + +/** NOTE If you do OPTIMIZE after insertion, + * it does not guarantee that all data will be in destination table at the time of + * next SELECT just after OPTIMIZE. + * + * Because in case if there was already running flush method, + * then call to flush inside OPTIMIZE will see empty buffer and return quickly, + * but at the same time, the already running flush method possibly is not finished, + * so next SELECT will observe missing data. + * + * This kind of race condition make very hard to implement proper tests. + */ +bool StorageTrivialBuffer::optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) +{ + if (!partition.empty()) + throw Exception("Partition cannot be specified when optimizing table of type TrivialBuffer", + ErrorCodes::NOT_IMPLEMENTED); + + if (final) + throw Exception("FINAL cannot be specified when optimizing table of type TrivialBuffer", + ErrorCodes::NOT_IMPLEMENTED); + + if (deduplicate) + throw Exception("DEDUPLICATE cannot be specified when optimizing table of type TrivialBuffer", + ErrorCodes::NOT_IMPLEMENTED); + + flush(false); + return true; +} + + + +bool StorageTrivialBuffer::checkThresholds( + const time_t current_time, const size_t additional_rows, const size_t additional_bytes) const +{ + time_t time_passed = 0; + if (first_write_time) + time_passed = current_time - first_write_time; + + size_t rows = current_rows + additional_rows; + size_t bytes = current_bytes + additional_bytes; + + return checkThresholdsImpl(rows, bytes, time_passed); + +} + +bool StorageTrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes, + const time_t time_passed) const +{ + if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedAllMinThresholds); + return true; + } + + if (time_passed > max_thresholds.time) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeMaxThreshold); + return true; + } + + if (rows > max_thresholds.rows) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsMaxThreshold); + return true; + } + + if (bytes > max_thresholds.bytes) + { + ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesMaxThreshold); + return true; + } + + return false; +} + +void StorageTrivialBuffer::flushThread() +{ + setThreadName("BufferFlush"); + + do + { + try + { + flush(true); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + while (!shutdown_event.tryWait(1000)); +} + +void StorageTrivialBuffer::writeBlockToDestination(const Block & block, StoragePtr table) +{ + if (no_destination || !block) + return; + + if (!table) + { + LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded."); + return; + } + + auto insert = std::make_shared(); + + insert->database = destination_database; + insert->table = destination_table; + + /** Inserting the set columns which is the intersection of buffer columns and destination table ones. + * It will help us to support some cases with different tables' structures. + */ + Block structure_of_destination_table = table->getSampleBlock(); + Names columns_intersection; + columns_intersection.reserve(block.columns()); + for (size_t i : ext::range(0, structure_of_destination_table.columns())) + { + auto dst_col = structure_of_destination_table.getByPosition(i); + if (block.has(dst_col.name)) + { + if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName()) + { + LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table + << " have different type of column " << dst_col.name << ". Block of data is discarded."); + return; + } + + columns_intersection.push_back(dst_col.name); + } + } + + if (columns_intersection.empty()) + { + LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded."); + return; + } + + if (columns_intersection.size() != block.columns()) + LOG_WARNING(log, "Not all columns from block in buffer exist in destination table " + << destination_database << "." << destination_table << ". Some columns are discarded."); + + auto list_of_columns = std::make_shared(); + insert->columns = list_of_columns; + list_of_columns->children.reserve(columns_intersection.size()); + for (const String & column : columns_intersection) + list_of_columns->children.push_back(std::make_shared(StringRange(), column, ASTIdentifier::Column)); + + InterpreterInsertQuery interpreter{insert, context}; + + auto block_io = interpreter.execute(); + block_io.out->writePrefix(); + block_io.out->write(block); + block_io.out->writeSuffix(); +} + +void StorageTrivialBuffer::alter( + const AlterCommands & params, const String & database_name, + const String & table_name, const Context & context) +{ + for (const auto & param : params) + if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) + throw Exception("Storage engine " + getName() + " doesn't support primary key.", + ErrorCodes::NOT_IMPLEMENTED); + + auto lock = lockStructureForAlter(); + + /// To avoid presence of blocks of different structure in the buffer. + flush(false); + + params.apply(*columns, materialized_columns, alias_columns, column_defaults); + + context.getDatabase(database_name)->alterTable( + context, table_name, + *columns, materialized_columns, alias_columns, column_defaults, {}); +} + +} diff --git a/dbms/src/Storages/StorageTrivialBuffer.h b/dbms/src/Storages/StorageTrivialBuffer.h new file mode 100644 index 00000000000..7969dc4dbe1 --- /dev/null +++ b/dbms/src/Storages/StorageTrivialBuffer.h @@ -0,0 +1,234 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace Poco { class Logger; } + +namespace DB +{ + +class Context; + +/** Stores incoming blocks until some thresholds are exceeded, then sends + * them to the table it looks into in the same order they came to the buffer. + * + * Thresolds are checked during insert and in background thread (to control + * time thresholds). + * If inserted block exceedes max limits, buffer is flushed and then the incoming + * block is appended to buffer. + * + * Destroying TrivialBuffer or shutting down lead to the buffer flushing. + * The data in the buffer is not replicated, logged or stored. After hard reset of the + * server, the data is lost. + */ +class StorageTrivialBuffer : private ext::shared_ptr_helper, public IStorage +{ +friend class ext::shared_ptr_helper; +friend class TrivialBufferBlockInputStream; +friend class TrivialBufferBlockOutputStream; + +public: + struct Thresholds + { + time_t time; /// Seconds after insertion of first block. + size_t rows; /// Number of rows in buffer. + size_t bytes; /// Number of bytes (incompressed) in buffer. + }; + + static StoragePtr create(const std::string & name_, NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, size_t num_blocks_to_deduplicate_, + const String & path_in_zk_for_deduplication_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_); + + std::string getName() const override { return "TrivialBuffer"; } + std::string getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } + + BlockInputStreams read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size = DEFAULT_BLOCK_SIZE, + unsigned threads = 1) override; + + BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override; + + bool checkThresholds(const time_t current_time, const size_t additional_rows = 0, + const size_t additional_bytes = 0) const; + bool checkThresholdsImpl(const size_t rows, const size_t bytes, + const time_t time_passed) const; + + /// Writes all the blocks in buffer into the destination table. + void shutdown() override; + bool optimize(const String & partition, bool final, bool deduplicate, const Settings & settings) override; + + void rename(const String & new_path_to_db, const String & new_database_name, + const String & new_table_name) override { name = new_table_name; } + + bool supportsSampling() const override { return true; } + bool supportsPrewhere() const override { return true; } + bool supportsFinal() const override { return true; } + bool supportsIndexForIn() const override { return true; } + bool supportsParallelReplicas() const override { return true; } + + /// Does not check or alter the structure of dependent table. + void alter(const AlterCommands & params, const String & database_name, + const String & table_name, const Context & context) override; + + class ZookeeperDeduplicationController + { + public: + using HashType = String; + + static HashType getHashFrom(SipHash & hash) { return std::to_string(hash.get64()); } + + bool contains(HashType block_hash) + { + std::string res; + return zookeeper->tryGet(path_in_zk_for_deduplication + "/" + block_hash, res); + } + + void insert(HashType block_hash) + { + std::vector current_hashes; + if (zookeeper->tryGetChildren(path_in_zk_for_deduplication, current_hashes) == ZNONODE) + { + throw DB::Exception("No node \'" + path_in_zk_for_deduplication + "\' to control deduplication."); + } + + // Cleanup zookeeper if needed. + if (current_hashes.size() >= 2*num_blocks_to_deduplicate) + { + using HashWithTimestamp = std::pair; + std::vector hashes_with_timestamps; + for (auto & hash : current_hashes) + { + zkutil::Stat stat; + String res; + String path_in_zk = path_in_zk_for_deduplication + "/" + hash; + if (!zookeeper->tryGet(path_in_zk, res, &stat)) + { + throw DB::Exception("Seems like a race conditions between replics was found, path: " + path_in_zk); + } + hashes_with_timestamps.emplace_back(path_in_zk, stat.ctime); + } + // We do not need to sort all the hashes, only 'num_blocks_to_deduplicate' hashes + // with minimum creation time. + auto hashes_with_timestamps_end = hashes_with_timestamps.end(); + if (hashes_with_timestamps.size() > num_blocks_to_deduplicate) + hashes_with_timestamps_end = hashes_with_timestamps.begin() + num_blocks_to_deduplicate; + std::partial_sort(hashes_with_timestamps.begin(), hashes_with_timestamps_end, hashes_with_timestamps.end(), + [] (const HashWithTimestamp & a, const HashWithTimestamp & b) -> bool + { + return a.second > b.second; + } + ); + zkutil::Ops nodes_to_remove; + for (auto it = hashes_with_timestamps.begin(); it != hashes_with_timestamps_end; ++it) + { + nodes_to_remove.emplace_back(std::make_unique(it->first, -1)); + } + zookeeper->tryMulti(nodes_to_remove); + } + + // Finally, inserting new node. + std::string path_for_insert = path_in_zk_for_deduplication + "/" + block_hash; + if (zookeeper->tryCreate(path_for_insert, {}, + zkutil::CreateMode::Persistent) != ZOK) + { + throw DB::Exception("Cannot create node at path: " + path_for_insert); + } + + } + + void updateOnDeduplication(HashType block_hash) + { + zookeeper->createOrUpdate(path_in_zk_for_deduplication + "/" + block_hash, + {}, zkutil::CreateMode::Persistent); + } + + ZookeeperDeduplicationController(size_t num_blocks_to_deduplicate_, zkutil::ZooKeeperPtr zookeeper_, + const std::string & path_in_zk_for_deduplication_) + : num_blocks_to_deduplicate(num_blocks_to_deduplicate_), + zookeeper(zookeeper_), path_in_zk_for_deduplication(path_in_zk_for_deduplication_) + { } + + private: + using DeduplicationBuffer = std::unordered_set; + + size_t num_blocks_to_deduplicate; + zkutil::ZooKeeperPtr zookeeper; + const std::string path_in_zk_for_deduplication; + }; + + +private: + String name; + NamesAndTypesListPtr columns; + + Context & context; + + std::mutex mutex; + + BlocksList data; + + size_t current_rows = 0; + size_t current_bytes = 0; + time_t first_write_time = 0; + const size_t num_blocks_to_deduplicate; + const String path_in_zk_for_deduplication; + zkutil::ZooKeeperPtr zookeeper; + ZookeeperDeduplicationController deduplication_controller; + + const Thresholds min_thresholds; + const Thresholds max_thresholds; + + const String destination_database; + const String destination_table; + /// If set, forces to clean out buffer, not write to destination table. + bool no_destination; + + Poco::Logger * log; + + Poco::Event shutdown_event; + /// Executes flushing by the time thresholds. + std::thread flush_thread; + + StorageTrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, size_t num_blocks_to_deduplicate_, + const String & path_in_zk_for_deduplication_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_); + + template + void addBlock(const Block & block, DeduplicatioController & deduplication_controller); + /// Parameter 'table' is passed because it's sometimes pre-computed. It should + /// conform the 'destination_table'. + void writeBlockToDestination(const Block & block, StoragePtr table); + + + void flush(bool check_thresholds = true, bool is_called_from_background = false); + void flushThread(); +}; + +} diff --git a/dbms/tests/integration/README.md b/dbms/tests/integration/README.md new file mode 100644 index 00000000000..b20a096b386 --- /dev/null +++ b/dbms/tests/integration/README.md @@ -0,0 +1,35 @@ +## ClickHouse integration tests + +This directory contains tests that involve several ClickHouse instances, custom configs, ZooKeeper, etc. + +### Running + +Prerequisites: +* [docker](https://www.docker.com/community-edition#/download). Minimum required API version: 1.25, check with `docker version`. +* [docker-compose](https://docs.docker.com/compose/). To install: `sudo pip install docker-compose` +* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo pip install pytest` + +If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER`. + +Run the tests with the `pytest` command. To select which tests to run, use: `pytest -k ` + +By default tests are run with system-wide client binary, server binary and base configs. To change that, +set the following environment variables: +* `CLICKHOUSE_TESTS_SERVER_BIN_PATH` to choose the server binary. +* `CLICKHOUSE_TESTS_CLIENT_BIN_PATH` to choose the client binary. +* `CLICKHOUSE_TESTS_BASE_CONFIG_DIR` to choose the directory from which base configs (`config.xml` and + `users.xml`) are taken. + +### Adding new tests + +To add new test named `foo`, create a directory `test_foo` with an empty `__init__.py` and a file +named `test.py` containing tests in it. All functions with names starting with `test` will become test cases. + +`helpers` directory contains utilities for: +* Launching a ClickHouse cluster with or without ZooKeeper in docker containers. +* Sending queries to launched instances. +* Introducing network failures such as severing network link between two instances. + +To assert that two TSV files must be equal, wrap them in the `TSV` class and use the regular `assert` +statement. Example: `assert TSV(result) == TSV(reference)`. In case the assertion fails, `pytest` +will automagically detect the types of variables and only the small diff of two files is printed. diff --git a/dbms/tests/integration/conftest.py b/dbms/tests/integration/conftest.py new file mode 100644 index 00000000000..167524b9582 --- /dev/null +++ b/dbms/tests/integration/conftest.py @@ -0,0 +1,5 @@ +from helpers.test_tools import TSV + +def pytest_assertrepr_compare(op, left, right): + if isinstance(left, TSV) and isinstance(right, TSV) and op == '==': + return ['TabSeparated values differ: '] + left.diff(right) diff --git a/dbms/tests/integration/helpers/__init__.py b/dbms/tests/integration/helpers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/helpers/client.py b/dbms/tests/integration/helpers/client.py new file mode 100644 index 00000000000..cbd08c75d53 --- /dev/null +++ b/dbms/tests/integration/helpers/client.py @@ -0,0 +1,44 @@ +import errno +import subprocess as sp +from threading import Timer + + +class Client: + def __init__(self, host, port=9000, command='/usr/bin/clickhouse-client'): + self.host = host + self.port = port + self.command = [command, '--host', self.host, '--port', str(self.port)] + + def query(self, sql, stdin=None, timeout=10.0): + if stdin is None: + command = self.command + ['--multiquery'] + stdin = sql + else: + command = self.command + ['--query', sql] + + process = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) + + timer = None + if timeout is not None: + def kill_process(): + try: + process.kill() + except OSError as e: + if e.errno != errno.ESRCH: + raise + + timer = Timer(timeout, kill_process) + timer.start() + + stdout, stderr = process.communicate(stdin) + + if timer is not None: + if timer.finished.is_set(): + raise Exception('Client timed out!') + else: + timer.cancel() + + if process.returncode != 0: + raise Exception('Client failed! return code: {}, stderr: {}'.format(process.returncode, stderr)) + + return stdout diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py new file mode 100644 index 00000000000..56a38069fa2 --- /dev/null +++ b/dbms/tests/integration/helpers/cluster.py @@ -0,0 +1,229 @@ +import os +import os.path as p +import re +import subprocess +import shutil +import socket +import time +import errno + +import docker + +from .client import Client + + +HELPERS_DIR = p.dirname(__file__) + + +class ClickHouseCluster: + """ClickHouse cluster with several instances and (possibly) ZooKeeper. + + Add instances with several calls to add_instance(), then start them with the start() call. + + Directories for instances are created in the directory of base_path. After cluster is started, + these directories will contain logs, database files, docker-compose config, ClickHouse configs etc. + """ + + def __init__(self, base_path, base_configs_dir=None, server_bin_path=None, client_bin_path=None): + self.base_dir = p.dirname(base_path) + + self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/') + self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse') + self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client') + + self.project_name = os.getlogin() + p.basename(self.base_dir) + # docker-compose removes everything non-alphanumeric from project names so we do it too. + self.project_name = re.sub(r'[^a-z0-9]', '', self.project_name.lower()) + + self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name] + self.instances = {} + self.with_zookeeper = False + self.is_up = False + + + def add_instance(self, name, custom_configs, with_zookeeper=False): + """Add an instance to the cluster. + + name - the name of the instance directory and the value of the 'instance' macro in ClickHouse. + custom_configs - a list of config files that will be added to config.d/ directory + with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster. + """ + + if self.is_up: + raise Exception('Can\'t add instance %s: cluster is already up!' % name) + + if name in self.instances: + raise Exception('Can\'t add instance %s: there is already an instance with the same name!' % name) + + instance = ClickHouseInstance(self.base_dir, name, custom_configs, with_zookeeper, self.base_configs_dir, self.server_bin_path) + self.instances[name] = instance + self.base_cmd.extend(['--file', instance.docker_compose_path]) + if with_zookeeper and not self.with_zookeeper: + self.with_zookeeper = True + self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]) + + return instance + + + def start(self, destroy_dirs=True): + if self.is_up: + return + + for instance in self.instances.values(): + instance.create_dir(destroy_dir=destroy_dirs) + + subprocess.check_call(self.base_cmd + ['up', '-d']) + + docker_client = docker.from_env() + for instance in self.instances.values(): + # According to how docker-compose names containers. + instance.docker_id = self.project_name + '_' + instance.name + '_1' + + container = docker_client.containers.get(instance.docker_id) + instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress'] + + instance.wait_for_start() + + instance.client = Client(instance.ip_address, command=self.client_bin_path) + + self.is_up = True + + + def shutdown(self, kill=True): + if kill: + subprocess.check_call(self.base_cmd + ['kill']) + subprocess.check_call(self.base_cmd + ['down', '--volumes']) + self.is_up = False + + for instance in self.instances.values(): + instance.docker_id = None + instance.ip_address = None + instance.client = None + + +DOCKER_COMPOSE_TEMPLATE = ''' +version: '2' +services: + {name}: + image: ubuntu:14.04 + user: '{uid}' + volumes: + - {binary_path}:/usr/bin/clickhouse:ro + - {configs_dir}:/etc/clickhouse-server/ + - {db_dir}:/var/lib/clickhouse/ + - {logs_dir}:/var/log/clickhouse-server/ + entrypoint: + - /usr/bin/clickhouse + - --config-file=/etc/clickhouse-server/config.xml + - --log-file=/var/log/clickhouse-server/clickhouse-server.log + depends_on: {depends_on} +''' + +MACROS_CONFIG_TEMPLATE = ''' + + + {name} + + +''' + +class ClickHouseInstance: + def __init__( + self, base_path, name, custom_configs, with_zookeeper, + base_configs_dir, server_bin_path): + + self.name = name + self.custom_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_configs] + self.with_zookeeper = with_zookeeper + + self.base_configs_dir = base_configs_dir + self.server_bin_path = server_bin_path + + self.path = p.abspath(p.join(base_path, name)) + self.docker_compose_path = p.join(self.path, 'docker_compose.yml') + + self.docker_id = None + self.ip_address = None + self.client = None + + + def query(self, sql, stdin=None): + return self.client.query(sql, stdin) + + + def wait_for_start(self, timeout=10.0): + deadline = time.time() + timeout + while True: + if time.time() >= deadline: + raise Exception("Timed out while waiting for instance {} with ip address {} to start".format(self.name, self.ip_address)) + + # Repeatedly poll the instance address until there is something that listens there. + # Usually it means that ClickHouse is ready to accept queries. + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.ip_address, 9000)) + return + except socket.error as e: + if e.errno == errno.ECONNREFUSED: + time.sleep(0.1) + else: + raise + finally: + sock.close() + + + def create_dir(self, destroy_dir=True): + """Create the instance directory and all the needed files there.""" + + if destroy_dir: + self.destroy_dir() + elif p.exists(self.path): + return + + os.mkdir(self.path) + + configs_dir = p.join(self.path, 'configs') + os.mkdir(configs_dir) + + shutil.copy(p.join(self.base_configs_dir, 'config.xml'), configs_dir) + shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir) + + config_d_dir = p.join(configs_dir, 'config.d') + os.mkdir(config_d_dir) + + shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir) + + with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config: + macros_config.write(MACROS_CONFIG_TEMPLATE.format(name=self.name)) + + if self.with_zookeeper: + shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir) + + for path in self.custom_config_paths: + shutil.copy(path, config_d_dir) + + db_dir = p.join(self.path, 'database') + os.mkdir(db_dir) + + logs_dir = p.join(self.path, 'logs') + os.mkdir(logs_dir) + + depends_on = '[]' + if self.with_zookeeper: + depends_on = '["zoo1", "zoo2", "zoo3"]' + + with open(self.docker_compose_path, 'w') as docker_compose: + docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format( + name=self.name, + uid=os.getuid(), + binary_path=self.server_bin_path, + configs_dir=configs_dir, + config_d_dir=config_d_dir, + db_dir=db_dir, + logs_dir=logs_dir, + depends_on=depends_on)) + + + def destroy_dir(self): + if p.exists(self.path): + shutil.rmtree(self.path) diff --git a/dbms/tests/integration/helpers/common_instance_config.xml b/dbms/tests/integration/helpers/common_instance_config.xml new file mode 100644 index 00000000000..ff357185cee --- /dev/null +++ b/dbms/tests/integration/helpers/common_instance_config.xml @@ -0,0 +1,4 @@ + + Europe/Moscow + :: + diff --git a/dbms/tests/integration/helpers/docker_compose_zookeeper.yml b/dbms/tests/integration/helpers/docker_compose_zookeeper.yml new file mode 100644 index 00000000000..365744f106d --- /dev/null +++ b/dbms/tests/integration/helpers/docker_compose_zookeeper.yml @@ -0,0 +1,25 @@ +version: '2' +services: + zoo1: + image: zookeeper + restart: always + environment: + ZOO_TICK_TIME: 500 + ZOO_MY_ID: 1 + ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 + + zoo2: + image: zookeeper + restart: always + environment: + ZOO_TICK_TIME: 500 + ZOO_MY_ID: 2 + ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 + + zoo3: + image: zookeeper + restart: always + environment: + ZOO_TICK_TIME: 500 + ZOO_MY_ID: 3 + ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 diff --git a/dbms/tests/integration/helpers/helper_container/Dockerfile b/dbms/tests/integration/helpers/helper_container/Dockerfile new file mode 100644 index 00000000000..57c52749d52 --- /dev/null +++ b/dbms/tests/integration/helpers/helper_container/Dockerfile @@ -0,0 +1,4 @@ +# Helper docker container to run iptables without sudo + +FROM alpine +RUN apk add -U iproute2 diff --git a/dbms/tests/integration/helpers/network.py b/dbms/tests/integration/helpers/network.py new file mode 100644 index 00000000000..97317ed1e3d --- /dev/null +++ b/dbms/tests/integration/helpers/network.py @@ -0,0 +1,159 @@ +import os.path as p +import subprocess +import time + +import docker + +from .cluster import HELPERS_DIR + + +class PartitionManager: + """Allows introducing failures in the network between docker containers. + + Can act as a context manager: + + with pm as PartitionManager(): + pm.partition_instances(instance1, instance2) + ... + # At exit all partitions are removed automatically. + + """ + + def __init__(self): + self._iptables_rules = [] + + def isolate_instance_from_zk(self, instance, action='DROP'): + self._check_instance(instance) + + self._add_rule({'source': instance.ip_address, 'destination_port': 2181, 'action': action}) + self._add_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action}) + + def partition_instances(self, left, right, action='DROP'): + self._check_instance(left) + self._check_instance(right) + + self._add_rule({'source': left.ip_address, 'destination': right.ip_address, 'action': action}) + self._add_rule({'source': right.ip_address, 'destination': left.ip_address, 'action': action}) + + def heal_all(self): + while self._iptables_rules: + rule = self._iptables_rules.pop() + _NetworkManager.get().delete_iptables_rule(**rule) + + @staticmethod + def _check_instance(instance): + if instance.ip_address is None: + raise Exception('Instance + ' + instance.name + ' is not launched!') + + def _add_rule(self, rule): + _NetworkManager.get().add_iptables_rule(**rule) + self._iptables_rules.append(rule) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.heal_all() + + +class _NetworkManager: + """Execute commands inside a container with access to network settings. + + We need to call iptables to create partitions, but we want to avoid sudo. + The way to circumvent this restriction is to run iptables in a container with network=host. + The container is long-running and periodically renewed - this is an optimization to avoid the overhead + of container creation on each call. + Source of the idea: https://github.com/worstcase/blockade/blob/master/blockade/host.py + """ + + # Singleton instance. + _instance = None + + @classmethod + def get(cls, **kwargs): + if cls._instance is None: + cls._instance = cls(**kwargs) + return cls._instance + + def add_iptables_rule(self, **kwargs): + cmd = ['iptables', '-A', 'DOCKER'] + cmd.extend(self._iptables_cmd_suffix(**kwargs)) + self._exec_run(cmd, privileged=True) + + def delete_iptables_rule(self, **kwargs): + cmd = ['iptables', '-D', 'DOCKER'] + cmd.extend(self._iptables_cmd_suffix(**kwargs)) + self._exec_run(cmd, privileged=True) + + @staticmethod + def _iptables_cmd_suffix( + source=None, destination=None, + source_port=None, destination_port=None, + action=None): + ret = [] + if source is not None: + ret.extend(['-s', source]) + if destination is not None: + ret.extend(['-d', destination]) + if source_port is not None: + ret.extend(['-p', 'tcp', '--sport', str(source_port)]) + if destination_port is not None: + ret.extend(['-p', 'tcp', '--dport', str(destination_port)]) + if action is not None: + ret.extend(['-j', action]) + return ret + + + def __init__( + self, + image_name='clickhouse_tests_helper', + image_path=p.join(HELPERS_DIR, 'helper_container'), + container_expire_timeout=50, container_exit_timeout=60): + + self.container_expire_timeout = container_expire_timeout + self.container_exit_timeout = container_exit_timeout + + self._docker_client = docker.from_env() + + try: + self._image = self._docker_client.images.get(image_name) + except docker.errors.ImageNotFound: + self._image = self._docker_client.images.build(tag=image_name, path=image_path, rm=True) + + self._container = None + + self._ensure_container() + + def _ensure_container(self): + if self._container is None or self._container_expire_time <= time.time(): + + if self._container is not None: + try: + self._container.remove(force=True) + except docker.errors.NotFound: + pass + + # Work around https://github.com/docker/docker-py/issues/1477 + host_config = self._docker_client.api.create_host_config(network_mode='host', auto_remove=True) + container_id = self._docker_client.api.create_container( + self._image.id, command=('sleep %s' % self.container_exit_timeout), + detach=True, host_config=host_config)['Id'] + + self._container_expire_time = time.time() + self.container_expire_timeout + self._docker_client.api.start(container_id) + self._container = self._docker_client.containers.get(container_id) + + return self._container + + def _exec_run(self, cmd, **kwargs): + container = self._ensure_container() + + handle = self._docker_client.api.exec_create(container.id, cmd, **kwargs) + output = self._docker_client.api.exec_start(handle).decode('utf8') + exit_code = self._docker_client.api.exec_inspect(handle)['ExitCode'] + + if exit_code != 0: + print output + raise subprocess.CalledProcessError(exit_code, cmd) + + return output diff --git a/dbms/tests/integration/helpers/test_tools.py b/dbms/tests/integration/helpers/test_tools.py new file mode 100644 index 00000000000..b5f5ee9f66a --- /dev/null +++ b/dbms/tests/integration/helpers/test_tools.py @@ -0,0 +1,13 @@ +import difflib + +class TSV: + """Helper to get pretty diffs between expected and actual tab-separated value files""" + + def __init__(self, contents): + self.lines = contents.readlines() if isinstance(contents, file) else contents.splitlines(True) + + def __eq__(self, other): + return self.lines == other.lines + + def diff(self, other): + return list(line.rstrip() for line in difflib.context_diff(self.lines, other.lines))[2:] diff --git a/dbms/tests/integration/helpers/zookeeper_config.xml b/dbms/tests/integration/helpers/zookeeper_config.xml new file mode 100644 index 00000000000..39a86cd999a --- /dev/null +++ b/dbms/tests/integration/helpers/zookeeper_config.xml @@ -0,0 +1,17 @@ + + + + zoo1 + 2181 + + + zoo2 + 2181 + + + zoo3 + 2181 + + 1000 + + diff --git a/dbms/tests/integration/pytest.ini b/dbms/tests/integration/pytest.ini new file mode 100644 index 00000000000..98229d4dc1d --- /dev/null +++ b/dbms/tests/integration/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +python_files = test.py diff --git a/dbms/tests/integration/test_delayed_replica_failover/__init__.py b/dbms/tests/integration/test_delayed_replica_failover/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_delayed_replica_failover/configs/remote_servers.xml b/dbms/tests/integration/test_delayed_replica_failover/configs/remote_servers.xml new file mode 100644 index 00000000000..014e7bf253e --- /dev/null +++ b/dbms/tests/integration/test_delayed_replica_failover/configs/remote_servers.xml @@ -0,0 +1,17 @@ + + + + + true + + replica1 + 9000 + + + replica2 + 9000 + + + + + diff --git a/dbms/tests/integration/test_delayed_replica_failover/test.py b/dbms/tests/integration/test_delayed_replica_failover/test.py new file mode 100644 index 00000000000..a04a83b641e --- /dev/null +++ b/dbms/tests/integration/test_delayed_replica_failover/test.py @@ -0,0 +1,74 @@ +import pytest +import time + +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager + + +cluster = ClickHouseCluster(__file__) + +instance_with_dist_table = cluster.add_instance('instance_with_dist_table', ['configs/remote_servers.xml']) +replica1 = cluster.add_instance('replica1', [], with_zookeeper=True) +replica2 = cluster.add_instance('replica2', [], with_zookeeper=True) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + for replica in (replica1, replica2): + replica.query( + "CREATE TABLE replicated (d Date, x UInt32) ENGINE = " + "ReplicatedMergeTree('/clickhouse/tables/replicated', '{instance}', d, d, 8192)") + + instance_with_dist_table.query( + "CREATE TABLE distributed (d Date, x UInt32) ENGINE = " + "Distributed('test_cluster', 'default', 'replicated')") + + yield cluster + + finally: + cluster.shutdown() + + +def test(started_cluster): + with PartitionManager() as pm: + pm.partition_instances(replica1, replica2) + + replica2.query("INSERT INTO replicated VALUES ('2017-05-08', 1)") + + time.sleep(1) # accrue replica delay + + assert replica1.query("SELECT count() FROM replicated").strip() == '' + assert replica2.query("SELECT count() FROM replicated").strip() == '1' + + # With in_order balancing replica1 is chosen. + assert instance_with_dist_table.query( + "SELECT count() FROM distributed SETTINGS load_balancing='in_order'").strip() == '' + + # When we set max_replica_delay, replica1 must be excluded. + assert instance_with_dist_table.query(''' +SELECT count() FROM distributed SETTINGS + load_balancing='in_order', + max_replica_delay_for_distributed_queries=1 +''').strip() == '1' + + pm.isolate_instance_from_zk(replica2) + + time.sleep(2) # allow pings to zookeeper to timeout + + # At this point all replicas are stale, but the query must still go to replica2 which is the least stale one. + assert instance_with_dist_table.query(''' +SELECT count() FROM distributed SETTINGS + load_balancing='in_order', + max_replica_delay_for_distributed_queries=1 +''').strip() == '1' + + # If we forbid stale replicas, the query must fail. + with pytest.raises(Exception): + instance_with_dist_table.query(''' +SELECT count() FROM distributed SETTINGS + load_balancing='in_order', + max_replica_delay_for_distributed_queries=1, + fallback_to_stale_replicas_for_distributed_queries=0 +''') diff --git a/dbms/tests/integration/test_graphite_merge_tree/__init__.py b/dbms/tests/integration/test_graphite_merge_tree/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_graphite_merge_tree/configs/graphite_rollup.xml b/dbms/tests/integration/test_graphite_merge_tree/configs/graphite_rollup.xml new file mode 100644 index 00000000000..1390d151731 --- /dev/null +++ b/dbms/tests/integration/test_graphite_merge_tree/configs/graphite_rollup.xml @@ -0,0 +1,25 @@ + + + + metric + timestamp + value + updated + + ^one_min + avg + + 0 + 60 + + + 7776000 + 300 + + + 31536000 + 600 + + + + diff --git a/dbms/tests/integration/test_graphite_merge_tree/test.py b/dbms/tests/integration/test_graphite_merge_tree/test.py new file mode 100644 index 00000000000..46637d47f78 --- /dev/null +++ b/dbms/tests/integration/test_graphite_merge_tree/test.py @@ -0,0 +1,216 @@ +import os.path as p +import time +import datetime +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV + + +cluster = ClickHouseCluster(__file__) +instance = cluster.add_instance('instance', ['configs/graphite_rollup.xml']) + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + instance.query('CREATE DATABASE test') + + yield cluster + + finally: + cluster.shutdown() + +@pytest.fixture +def graphite_table(started_cluster): + instance.query(''' +DROP TABLE IF EXISTS test.graphite; +CREATE TABLE test.graphite + (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) + ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup'); +''') + + yield + + instance.query('DROP TABLE test.graphite') + + +def test_rollup_versions(graphite_table): + timestamp = int(time.time()) + rounded_timestamp = timestamp - timestamp % 60 + date = datetime.date.today().isoformat() + + q = instance.query + + # Insert rows with timestamps relative to the current time so that the first retention clause is active. + # Two parts are created. + q(''' +INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1); +INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2); +'''.format(timestamp=timestamp, date=date)) + + expected1 = '''\ +one_min.x1 100 {timestamp} {date} 1 +one_min.x1 200 {timestamp} {date} 2 +'''.format(timestamp=timestamp, date=date) + + assert TSV(q('SELECT * FROM test.graphite ORDER BY updated')) == TSV(expected1) + + q('OPTIMIZE TABLE test.graphite') + + # After rollup only the row with max version is retained. + expected2 = '''\ +one_min.x1 200 {timestamp} {date} 2 +'''.format(timestamp=rounded_timestamp, date=date) + + assert TSV(q('SELECT * FROM test.graphite')) == TSV(expected2) + + +def test_rollup_aggregation(graphite_table): + q = instance.query + + # This query essentially emulates what rollup does. + result1 = q(''' +SELECT avg(v), max(upd) +FROM (SELECT timestamp, + argMax(value, (updated, number)) AS v, + max(updated) AS upd + FROM (SELECT 'one_min.x5' AS metric, + toFloat64(number) AS value, + toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, + toDate('2017-02-02') AS date, + toUInt32(intDiv(number, 2)) AS updated, + number + FROM system.numbers LIMIT 1000000) + WHERE intDiv(timestamp, 600) * 600 = 1111444200 + GROUP BY timestamp) +''') + + expected1 = '''\ +999634.9918367347 499999 +''' + assert TSV(result1) == TSV(expected1) + + # Timestamp 1111111111 is in sufficiently distant past so that the last retention clause is active. + result2 = q(''' +INSERT INTO test.graphite + SELECT 'one_min.x' AS metric, + toFloat64(number) AS value, + toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, + toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated + FROM (SELECT * FROM system.numbers LIMIT 1000000) + WHERE intDiv(timestamp, 600) * 600 = 1111444200; + +OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; + +SELECT * FROM test.graphite; +''') + + expected2 = '''\ +one_min.x 999634.9918367347 1111444200 2017-02-02 499999 +''' + + assert TSV(result2) == TSV(expected2) + + +def test_rollup_aggregation_2(graphite_table): + result = instance.query(''' +INSERT INTO test.graphite + SELECT 'one_min.x' AS metric, + toFloat64(number) AS value, + toUInt32(1111111111 - intDiv(number, 3)) AS timestamp, + toDate('2017-02-02') AS date, + toUInt32(100 - number) AS updated + FROM (SELECT * FROM system.numbers LIMIT 50); + +OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; + +SELECT * FROM test.graphite; +''') + + expected = '''\ +one_min.x 24 1111110600 2017-02-02 100 +''' + + assert TSV(result) == TSV(expected) + + +def test_multiple_paths_and_versions(graphite_table): + result = instance.query(''' +INSERT INTO test.graphite + SELECT 'one_min.x' AS metric, + toFloat64(number) AS value, + toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp, + toDate('2017-02-02') AS date, + toUInt32(100 - number) AS updated + FROM (SELECT * FROM system.numbers LIMIT 50); + +OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; + +SELECT * FROM test.graphite; + + +INSERT INTO test.graphite + SELECT 'one_min.y' AS metric, + toFloat64(number) AS value, + toUInt32(1111111111 + number * 600) AS timestamp, + toDate('2017-02-02') AS date, + toUInt32(100 - number) AS updated + FROM (SELECT * FROM system.numbers LIMIT 50); + +OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; + +SELECT * FROM test.graphite; +''') + + with open(p.join(p.dirname(__file__), 'test_multiple_paths_and_versions.reference')) as reference: + assert TSV(result) == TSV(reference) + + +def test_multiple_output_blocks(graphite_table): + MERGED_BLOCK_SIZE = 8192 + + to_insert = '' + expected = '' + for i in range(2 * MERGED_BLOCK_SIZE + 1): + rolled_up_time = 1000000200 + 600 * i + + for j in range(3): + cur_time = rolled_up_time + 100 * j + to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(10 * j, cur_time) + to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(10 * (j + 1), cur_time) + + expected += 'one_min.x1 20 {} 2001-09-09 2\n'.format(rolled_up_time) + + instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert) + + result = instance.query(''' +OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL; + +SELECT * FROM test.graphite; +''') + + assert TSV(result) == TSV(expected) + + +def test_paths_not_matching_any_pattern(graphite_table): + to_insert = '''\ +one_min.x1 100 1000000000 2001-09-09 1 +zzzzzzzz 100 1000000001 2001-09-09 1 +zzzzzzzz 200 1000000001 2001-09-09 2 +''' + + instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert) + + expected = '''\ +one_min.x1 100 999999600 2001-09-09 1 +zzzzzzzz 200 1000000001 2001-09-09 2 +''' + + result = instance.query(''' +OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL; + +SELECT * FROM test.graphite; +''') + + assert TSV(result) == TSV(expected) diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test4.reference b/dbms/tests/integration/test_graphite_merge_tree/test_multiple_paths_and_versions.reference similarity index 100% rename from dbms/tests/integration_drafts/graphite_merge_tree/test4.reference rename to dbms/tests/integration/test_graphite_merge_tree/test_multiple_paths_and_versions.reference diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/config.d/graphite_rollup.xml b/dbms/tests/integration_drafts/graphite_merge_tree/config.d/graphite_rollup.xml deleted file mode 100644 index 2e3cb9990d8..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/config.d/graphite_rollup.xml +++ /dev/null @@ -1,128 +0,0 @@ - - - - metric - timestamp - value - updated - - ^one_sec - avg - - 0 - 1 - - - 86400 - 5 - - - 604800 - 60 - - - 7776000 - 300 - - - 31536000 - 600 - - - - ^five_sec - avg - - 0 - 5 - - - 604800 - 60 - - - 7776000 - 300 - - - 31536000 - 600 - - - - ^one_min - avg - - 0 - 60 - - - 7776000 - 300 - - - 31536000 - 600 - - - - ^five_min - avg - - 0 - 300 - - - 31536000 - 600 - - - - ^ten_min - avg - - 0 - 600 - - - - ^half_hour - avg - - 0 - 1800 - - - - ^one_hour - avg - - 0 - 3600 - - - - ^one_day - avg - - 0 - 86400 - - - - avg - - 0 - 60 - - - 2592000 - 300 - - - 31536000 - 600 - - - - diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test1.reference b/dbms/tests/integration_drafts/graphite_merge_tree/test1.reference deleted file mode 100644 index 11bcb7bd42d..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test1.reference +++ /dev/null @@ -1,3 +0,0 @@ -one_min.x1 100 1486048740 2017-02-02 1 -one_min.x1 200 1486048740 2017-02-02 2 -one_min.x1 200 1486048740 2017-02-02 2 diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test1.sql b/dbms/tests/integration_drafts/graphite_merge_tree/test1.sql deleted file mode 100644 index ea83e324b59..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test1.sql +++ /dev/null @@ -1,13 +0,0 @@ -DROP TABLE IF EXISTS test.graphite; -CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup'); - -INSERT into test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, toUInt32(toDateTime('2017-02-02 18:19:00')), toDate('2017-02-02'), 1); -INSERT into test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, toUInt32(toDateTime('2017-02-02 18:19:00')), toDate('2017-02-02'), 2); - -SELECT * FROM test.graphite ORDER BY updated; - -OPTIMIZE TABLE test.graphite; - -SELECT * FROM test.graphite ORDER BY updated; - -DROP TABLE test.graphite; diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test2.reference b/dbms/tests/integration_drafts/graphite_merge_tree/test2.reference deleted file mode 100644 index 1c78d9bc9dc..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test2.reference +++ /dev/null @@ -1,2 +0,0 @@ -one_min.x 999636.4856809663 1111444200 2017-02-02 499999 -999634.9918367347 499999 diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test2.sql b/dbms/tests/integration_drafts/graphite_merge_tree/test2.sql deleted file mode 100644 index f19daded239..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test2.sql +++ /dev/null @@ -1,10 +0,0 @@ -DROP TABLE IF EXISTS test.graphite; -CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup'); - -INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated FROM (SELECT * FROM system.numbers LIMIT 1000000) WHERE intDiv(timestamp, 600) * 600 = 1111444200; -OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; -SELECT * FROM test.graphite; - -SELECT avg(v), max(upd) FROM (SELECT timestamp, argMax(value, (updated, number)) AS v, max(updated) AS upd FROM (SELECT 'one_min.x5' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated, number FROM system.numbers LIMIT 1000000) WHERE intDiv(timestamp, 600) * 600 = 1111444200 GROUP BY timestamp); - -DROP TABLE test.graphite; diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test3.reference b/dbms/tests/integration_drafts/graphite_merge_tree/test3.reference deleted file mode 100644 index e89d931e0a5..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test3.reference +++ /dev/null @@ -1 +0,0 @@ -one_min.x 24 1111110600 2017-02-02 100 diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test3.sql b/dbms/tests/integration_drafts/graphite_merge_tree/test3.sql deleted file mode 100644 index e521abb285e..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test3.sql +++ /dev/null @@ -1,8 +0,0 @@ -DROP TABLE IF EXISTS test.graphite; -CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup'); - -INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 - intDiv(number, 3)) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50); -OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; -SELECT * FROM test.graphite; - -DROP TABLE test.graphite; diff --git a/dbms/tests/integration_drafts/graphite_merge_tree/test4.sql b/dbms/tests/integration_drafts/graphite_merge_tree/test4.sql deleted file mode 100644 index 4c532337729..00000000000 --- a/dbms/tests/integration_drafts/graphite_merge_tree/test4.sql +++ /dev/null @@ -1,12 +0,0 @@ -DROP TABLE IF EXISTS test.graphite; -CREATE TABLE test.graphite (metric String, value Float64, timestamp UInt32, date Date, updated UInt32) ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup'); - -INSERT INTO test.graphite SELECT 'one_min.x' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50); -OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; -SELECT * FROM test.graphite; - -INSERT INTO test.graphite SELECT 'one_min.y' AS metric, toFloat64(number) AS value, toUInt32(1111111111 + number * 600) AS timestamp, toDate('2017-02-02') AS date, toUInt32(100 - number) AS updated FROM (SELECT * FROM system.numbers LIMIT 50); -OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL; -SELECT * FROM test.graphite; - -DROP TABLE test.graphite; diff --git a/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.reference b/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.reference index f3c764eaf9a..2f716683716 100644 --- a/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.reference +++ b/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.reference @@ -12,3 +12,5 @@ 3 Goodbye 1 Hello 3 Goodbye +1 TSKV +4 TSKV Ok diff --git a/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.sh b/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.sh index f5e1fc5e19e..204e1017fab 100755 --- a/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.sh +++ b/dbms/tests/queries/0_stateless/00418_input_format_allow_errors.sh @@ -17,6 +17,8 @@ echo -ne '1\tHello\n2\n3\tGoodbye\n\n' | clickhouse-client --input_format_allow_ echo -ne '1\tHello\n2\n3\tGoodbye\n\n' | clickhouse-client --input_format_allow_errors_num=1 --input_format_allow_errors_ratio=0.6 --query="INSERT INTO test.formats_test FORMAT TSV" +echo -ne 'x=1\ts=TSKV\nx=minus2\ts=trash1\ns=trash2\tx=-3\ns=TSKV Ok\tx=4\ns=trash3\tx=-5\n' | clickhouse-client --input_format_allow_errors_num=3 -q "INSERT INTO test.formats_test FORMAT TSKV" + clickhouse-client --query="SELECT * FROM test.formats_test" clickhouse-client --query="DROP TABLE test.formats_test" diff --git a/dbms/tests/queries/0_stateless/00460_vertical_and_totals_extremes.reference b/dbms/tests/queries/0_stateless/00460_vertical_and_totals_extremes.reference new file mode 100644 index 00000000000..b142a014283 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00460_vertical_and_totals_extremes.reference @@ -0,0 +1,298 @@ +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + +Row 5: +────── +k: 4 +count(): 20 + + +Totals: +─────── +k: 0 +count(): 100 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + +Row 5: +────── +k: 4 +count(): 20 + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + +Row 5: +────── +k: 4 +count(): 20 + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + + Showed first 4. + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + + Showed first 4. + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + + Showed first 4. + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + +Row 5: +────── +k: 4 +count(): 20 + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 +Row 1: +────── +k: 0 +count(): 20 + +Row 2: +────── +k: 1 +count(): 20 + +Row 3: +────── +k: 2 +count(): 20 + +Row 4: +────── +k: 3 +count(): 20 + + Showed first 4. + + +Totals: +─────── +k: 0 +count(): 100 + +Min: +──── +k: 0 +count(): 20 + +Max: +──── +k: 4 +count(): 20 diff --git a/dbms/tests/queries/0_stateless/00460_vertical_and_totals_extremes.sql b/dbms/tests/queries/0_stateless/00460_vertical_and_totals_extremes.sql new file mode 100644 index 00000000000..137304ce713 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00460_vertical_and_totals_extremes.sql @@ -0,0 +1,22 @@ +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical; + +SET extremes = 1; +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical; + +SET output_format_pretty_max_rows = 5; +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical; + +SET output_format_pretty_max_rows = 4; +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT Vertical; + + +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw; + +SET extremes = 1; +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw; + +SET output_format_pretty_max_rows = 5; +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw; + +SET output_format_pretty_max_rows = 4; +SELECT k, count() FROM (SELECT number % 5 AS k FROM system.numbers LIMIT 100) GROUP BY k WITH TOTALS ORDER BY k FORMAT VerticalRaw; diff --git a/dbms/tests/queries/0_stateless/00461_default_value_of_argument_type.reference b/dbms/tests/queries/0_stateless/00461_default_value_of_argument_type.reference new file mode 100644 index 00000000000..32b90eabb16 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00461_default_value_of_argument_type.reference @@ -0,0 +1 @@ +[] [] (0,'','0000-00-00 00:00:00','0000-00-00') diff --git a/dbms/tests/queries/0_stateless/00461_default_value_of_argument_type.sql b/dbms/tests/queries/0_stateless/00461_default_value_of_argument_type.sql new file mode 100644 index 00000000000..f4786b5541e --- /dev/null +++ b/dbms/tests/queries/0_stateless/00461_default_value_of_argument_type.sql @@ -0,0 +1 @@ +SELECT defaultValueOfArgumentType([1, 2, 3]), defaultValueOfArgumentType([[[1]]]), defaultValueOfArgumentType((1, 'Hello', now(), today())); diff --git a/dbms/tests/queries/0_stateless/00462_json_true_false_literals.reference b/dbms/tests/queries/0_stateless/00462_json_true_false_literals.reference new file mode 100644 index 00000000000..77d2a070c6d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00462_json_true_false_literals.reference @@ -0,0 +1,4 @@ +0 0 +1 1 +0 false +1 true diff --git a/dbms/tests/queries/0_stateless/00462_json_true_false_literals.sql b/dbms/tests/queries/0_stateless/00462_json_true_false_literals.sql new file mode 100644 index 00000000000..1906ac6ebff --- /dev/null +++ b/dbms/tests/queries/0_stateless/00462_json_true_false_literals.sql @@ -0,0 +1,6 @@ +DROP TABLE IF EXISTS test.json; +CREATE TABLE test.json (x UInt8, title String) ENGINE = Memory; +INSERT INTO test.json FORMAT JSONEachRow {"x": true, "title": "true"}, {"x": false, "title": "false"}, {"x": 0, "title": "0"}, {"x": 1, "title": "1"} + +SELECT * FROM test.json ORDER BY title; +DROP TABLE IF EXISTS test.json; diff --git a/docs/ru/settings/index.rst b/docs/ru/settings/index.rst index aa51d89e264..6def0b38753 100644 --- a/docs/ru/settings/index.rst +++ b/docs/ru/settings/index.rst @@ -2,6 +2,7 @@ ========== Описанные в разделе настройки могут быть заданы следующими способами: + * Глобально. В конфигурационных файлах сервера. diff --git a/docs/ru/settings/settings.rst b/docs/ru/settings/settings.rst index 4d0c54fccd6..39efa6f738e 100644 --- a/docs/ru/settings/settings.rst +++ b/docs/ru/settings/settings.rst @@ -42,6 +42,35 @@ fallback_to_stale_replicas_for_distributed_queries По умолчанию - 1 (включена). + +input_format_allow_errors_num +----------------------------- +Устанавливает максимальное количество допустимых ошибок при чтении из текстовых форматов (CSV, TSV и т.п.). + +Значение по умолчанию - 0. + +Используйте обязательно в паре с ``input_format_allow_errors_ratio``. Для пропуска ошибок, значения обеих настроек должны быть больше 0. + +Если при чтении строки возникла ошибка, но при этом счетчик ошибок меньше ``input_format_allow_errors_num``, то ClickHouse игнорирует строку и переходит к следующей. + +В случае превышения ``input_format_allow_errors_num`` ClickHouse генерирует исключение. + + +input_format_allow_errors_ratio +------------------------------- +Устанавливает максимальную долю допустимых ошибок при чтении из текстовых форматов (CSV, TSV и т.п.). +Доля ошибок задаётся в виде числа с плавающей запятой от 0 до 1. + +Значение по умолчанию - 0. + +Используйте обязательно в паре с ``input_format_allow_errors_num``. Для пропуска ошибок, значения обеих настроек должны быть больше 0. + +Если при чтении строки возникла ошибка, но при этом текущая доля ошибок меньше ``input_format_allow_errors_ratio``, то ClickHouse игнорирует строку и переходит к следующей. + +В случае превышения ``input_format_allow_errors_ratio`` ClickHouse генерирует исключение. + + + max_block_size -------------- Данные в ClickHouse обрабатываются по блокам (наборам кусочков столбцов). Внутренние циклы обработки одного блока достаточно эффективны, но при этом существуют заметные издержки на каждый блок. ``max_block_size`` - это рекомендация, какого размера блоки (в количестве строк) загружать из таблицы. Размер блока должен быть не слишком маленьким, чтобы издержки на каждый блок оставались незаметными, и не слишком большим, чтобы запрос с LIMIT-ом, который завершается уже после первого блока, выполнялся быстро; чтобы не использовалось слишком много оперативки при вынимании большого количества столбцов в несколько потоков; чтобы оставалась хоть какая-нибудь кэш-локальность.