Merge remote-tracking branch 'upstream/master' into fix25

This commit is contained in:
proller 2019-09-05 23:06:55 +03:00
commit f4f46c308f
61 changed files with 2644 additions and 223 deletions

View File

@ -372,8 +372,8 @@ if (USE_PROTOBUF)
endif ()
if (USE_HDFS)
target_link_libraries (clickhouse_common_io PRIVATE ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR})
target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif()
if (USE_BROTLI)

View File

@ -449,6 +449,8 @@ namespace ErrorCodes
extern const int READONLY_SETTING = 472;
extern const int DEADLOCK_AVOIDED = 473;
extern const int INVALID_TEMPLATE_FORMAT = 474;
extern const int INVALID_WITH_FILL_EXPRESSION = 475;
extern const int WITH_TIES_WITHOUT_ORDER_BY = 476;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,89 @@
#pragma once
#include <algorithm>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
struct SharedBlockRowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedBlockPtr shared_block;
void swap(SharedBlockRowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const SharedBlockRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const SharedBlockRowRef & other) const
{
return !(*this == other);
}
void reset()
{
SharedBlockRowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
void set(SharedBlockPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
}
};
}

View File

@ -0,0 +1,76 @@
#include <Common/parseGlobs.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <algorithm>
#include <sstream>
namespace DB
{
/* Transforms string from grep-wildcard-syntax ("{N..M}", "{a,b,c}" as in remote table function and "*", "?") to perl-regexp for using re2 library fo matching
* with such steps:
* 1) search intervals like {0..9} and enums like {abc,xyz,qwe} in {}, replace them by regexp with pipe (expr1|expr2|expr3),
* 2) search and replace "*" and "?".
* Before each search need to escape symbols that we would not search.
*
* There are few examples in unit tests.
*/
std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_globs)
{
std::ostringstream oss_for_escaping;
/// Escaping only characters that not used in glob syntax
for (const auto & letter : initial_str_with_globs)
{
if ((letter == '[') || (letter == ']') || (letter == '|') || (letter == '+') || (letter == '-') || (letter == '(') || (letter == ')'))
oss_for_escaping << '\\';
oss_for_escaping << letter;
}
std::string escaped_with_globs = oss_for_escaping.str();
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without {}*,
re2::StringPiece input(escaped_with_globs);
re2::StringPiece matched;
std::ostringstream oss_for_replacing;
size_t current_index = 0;
while (RE2::FindAndConsume(&input, enum_or_range, &matched))
{
std::string buffer = matched.ToString();
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
if (buffer.find(',') == std::string::npos)
{
size_t range_begin, range_end;
char point;
std::istringstream iss_range(buffer);
iss_range >> range_begin >> point >> point >> range_end;
oss_for_replacing << range_begin;
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|' << i;
}
}
else
{
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
}
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
oss_for_replacing << escaped_with_globs.substr(current_index);
std::string almost_res = oss_for_replacing.str();
std::ostringstream oss_final_processing;
for (const auto & letter : almost_res)
{
if ((letter == '?') || (letter == '*'))
{
oss_final_processing << "[^/]"; /// '?' is any symbol except '/'
if (letter == '?')
continue;
}
if ((letter == '.') || (letter == '{') || (letter == '}'))
oss_final_processing << '\\';
oss_final_processing << letter;
}
return oss_final_processing.str();
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <string>
#include <vector>
namespace DB
{
/* Parse globs in string and make a regexp for it.
*/
std::string makeRegexpPatternFromGlobs(const std::string & path);
}

View File

@ -0,0 +1,20 @@
#include <Common/parseGlobs.h>
#include <re2/re2.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(Common, makeRegexpPatternFromGlobs)
{
EXPECT_EQ(makeRegexpPatternFromGlobs("f{01..09}"), "f(1|2|3|4|5|6|7|8|9)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{01..9}"), "f(1|2|3|4|5|6|7|8|9)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0001..0000009}"), "f(1|2|3|4|5|6|7|8|9)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..2}{1..2}"), "f(1|2)(1|2)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{1..1}{1..1}"), "f(1)(1)");
EXPECT_EQ(makeRegexpPatternFromGlobs("f{0..0}{0..0}"), "f(0)(0)");
EXPECT_EQ(makeRegexpPatternFromGlobs("file{1..5}"),"file(1|2|3|4|5)");
EXPECT_EQ(makeRegexpPatternFromGlobs("file{1,2,3}"),"file(1|2|3)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{1,2,3}blabla{a.x,b.x,c.x}smth[]_else{aa,bb}?*"), "(1|2|3)blabla(a\\.x|b\\.x|c\\.x)smth\\[\\]_else(aa|bb)[^/][^/]*");
}

View File

@ -4,13 +4,22 @@
#include <memory>
#include <cstddef>
#include <string>
#include <Core/Field.h>
class Collator;
namespace DB
{
struct FillColumnDescription
{
/// All missed values in range [FROM, TO) will be filled
/// Range [FROM, TO) respects sorting direction
Field fill_from; /// Fill value >= FILL_FROM
Field fill_to; /// Fill value + STEP < FILL_TO
Field fill_step; /// Default = 1 or -1 according to direction
};
/// Description of the sorting rule by one column.
struct SortColumnDescription
{
@ -20,12 +29,23 @@ struct SortColumnDescription
int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
/// To achieve NULLS LAST, set it equal to direction, to achieve NULLS FIRST, set it opposite.
std::shared_ptr<Collator> collator; /// Collator for locale-specific comparison of strings
bool with_fill;
FillColumnDescription fill_description;
SortColumnDescription(size_t column_number_, int direction_, int nulls_direction_, const std::shared_ptr<Collator> & collator_ = nullptr)
: column_number(column_number_), direction(direction_), nulls_direction(nulls_direction_), collator(collator_) {}
SortColumnDescription(const std::string & column_name_, int direction_, int nulls_direction_, const std::shared_ptr<Collator> & collator_ = nullptr)
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_), collator(collator_) {}
SortColumnDescription(
size_t column_number_, int direction_, int nulls_direction_,
const std::shared_ptr<Collator> & collator_ = nullptr, bool with_fill_ = false,
const FillColumnDescription & fill_description_ = {})
: column_number(column_number_), direction(direction_), nulls_direction(nulls_direction_), collator(collator_)
, with_fill(with_fill_), fill_description(fill_description_) {}
SortColumnDescription(
const std::string & column_name_, int direction_, int nulls_direction_,
const std::shared_ptr<Collator> & collator_ = nullptr, bool with_fill_ = false,
const FillColumnDescription & fill_description_ = {})
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_)
, collator(collator_), with_fill(with_fill_), fill_description(fill_description_) {}
bool operator == (const SortColumnDescription & other) const
{

View File

@ -50,8 +50,8 @@ private:
std::vector<ColumnAggregateFunction *> columns_to_aggregate;
std::vector<SimpleAggregateDescription> columns_to_simple_aggregate;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.

View File

@ -47,12 +47,12 @@ private:
/// Read is finished.
bool finished = false;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
RowRef first_negative; /// The first negative row for the current primary key.
RowRef last_positive; /// The last positive row for the current primary key.
RowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
SharedBlockRowRef first_negative; /// The first negative row for the current primary key.
SharedBlockRowRef last_positive; /// The last positive row for the current primary key.
SharedBlockRowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.

View File

@ -0,0 +1,186 @@
#include <DataStreams/FillingBlockInputStream.h>
#include <Interpreters/convertFieldToType.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
}
FillingBlockInputStream::FillingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & sort_description_)
: sort_description(sort_description_), filling_row(sort_description_), next_row(sort_description_)
{
children.push_back(input);
header = children.at(0)->getHeader();
std::vector<bool> is_fill_column(header.columns());
for (const auto & elem : sort_description)
is_fill_column[header.getPositionByName(elem.column_name)] = true;
auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
{
auto max_type = Field::Types::Null;
WhichDataType which(type);
DataTypePtr to_type;
if (isInteger(type) || which.isDateOrDateTime())
{
max_type = Field::Types::Int64;
to_type = std::make_shared<DataTypeInt64>();
}
else if (which.isFloat())
{
max_type = Field::Types::Float64;
to_type = std::make_shared<DataTypeFloat64>();
}
if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
|| descr.fill_step.getType() > max_type)
return false;
descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
return true;
};
for (size_t i = 0; i < header.columns(); ++i)
{
if (is_fill_column[i])
{
size_t pos = fill_column_positions.size();
auto & descr = filling_row.getFillDescription(pos);
auto type = header.getByPosition(i).type;
if (!try_convert_fields(descr, type))
throw Exception("Incompatible types of WITH FILL expression values with column type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (type->isValueRepresentedByUnsignedInteger() &&
((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
(!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
{
throw Exception("WITH FILL bound values cannot be negative for unsigned type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
fill_column_positions.push_back(i);
}
else
other_column_positions.push_back(i);
}
}
Block FillingBlockInputStream::readImpl()
{
Columns old_fill_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_other_columns;
auto init_columns_by_positions = [](const Block & block, Columns & columns,
MutableColumns & mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
auto column = block.getByPosition(pos).column;
columns.push_back(column);
mutable_columns.push_back(column->cloneEmpty()->assumeMutable());
}
};
auto block = children.back()->read();
if (!block)
{
init_columns_by_positions(header, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(header, old_other_columns, res_other_columns, other_column_positions);
bool should_insert_first = next_row < filling_row;
bool generated = false;
for (size_t i = 0; i < filling_row.size(); ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
while (filling_row.next(next_row))
{
generated = true;
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
}
if (generated)
return createResultBlock(res_fill_columns, res_other_columns);
return block;
}
size_t rows = block.rows();
init_columns_by_positions(block, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(block, old_other_columns, res_other_columns, other_column_positions);
if (first)
{
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
break;
}
filling_row[i] = current_value;
}
first = false;
}
for (size_t row_ind = 0; row_ind < rows; ++row_ind)
{
bool should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
/// Insert generated filling row to block, while it is less than current row in block.
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
return createResultBlock(res_fill_columns, res_other_columns);
}
Block FillingBlockInputStream::createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(header.columns());
for (size_t i = 0; i < fill_columns.size(); ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0; i < other_columns.size(); ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
return header.cloneWithColumns(std::move(result_columns));
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/FillingRow.h>
namespace DB
{
/** Implements modifier WITH FILL of ORDER BY clause.
* It fills gaps in data stream by rows with missing values in columns with set WITH FILL and deafults in other columns.
* Optionally FROM, TO and STEP values can be specified.
*/
class FillingBlockInputStream : public IBlockInputStream
{
public:
FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & fill_description_);
String getName() const override { return "Filling"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const;
const SortDescription sort_description; /// Contains only rows with WITH FILL.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
Block header;
using Positions = std::vector<size_t>;
Positions fill_column_positions;
Positions other_column_positions;
bool first = true;
};
}

View File

@ -321,7 +321,7 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
}
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
void GraphiteRollupSortedBlockInputStream::accumulateRow(SharedBlockRowRef & row)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)

View File

@ -204,7 +204,7 @@ private:
StringRef current_group_path;
/// Last row with maximum version for current primary key (time bucket).
RowRef current_subgroup_newest_row;
SharedBlockRowRef current_subgroup_newest_row;
/// Time of last read row
time_t current_time = 0;
@ -236,7 +236,7 @@ private:
void finishCurrentGroup(MutableColumns & merged_columns);
/// Update the state of the aggregate function with the new `value`.
void accumulateRow(RowRef & row);
void accumulateRow(SharedBlockRowRef & row);
};
}

View File

@ -6,8 +6,30 @@
namespace DB
{
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, bool use_limit_as_total_rows_approx)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_)
/// gets pointers to all columns of block, which were used for ORDER BY
static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
, description(description_)
{
if (use_limit_as_total_rows_approx)
{
@ -17,13 +39,45 @@ LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input,
children.push_back(input);
}
Block LimitBlockInputStream::readImpl()
{
Block res;
UInt64 rows = 0;
/// pos - how many rows were read, including the last read block
/// pos >= offset + limit and all rows in the end of previous block were equal
/// to row at 'limit' position. So we check current block.
if (!ties_row_ref.empty() && pos >= offset + limit)
{
res = children.back()->read();
rows = res.rows();
if (!res)
return res;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ptr->sort_columns = extractSortColumns(*ptr, description);
UInt64 len;
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;
}
}
if (len < rows)
{
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
}
return *ptr;
}
if (pos >= offset + limit)
{
@ -46,9 +100,18 @@ Block LimitBlockInputStream::readImpl()
pos += rows;
} while (pos <= offset);
/// return the whole block
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
if (with_ties)
ptr->sort_columns = extractSortColumns(*ptr, description);
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;
{
/// Save rowref for last row, because probalbly next block begins with the same row.
if (with_ties && pos == offset + limit)
ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1);
return *ptr;
}
/// give away a piece of the block
UInt64 start = std::max(
@ -60,13 +123,36 @@ Block LimitBlockInputStream::readImpl()
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
for (size_t i = 0; i < res.columns(); ++i)
res.getByPosition(i).column = res.getByPosition(i).column->cut(start, length);
/// check if other rows in current block equals to last one in limit
if (with_ties)
{
ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1);
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, i);
if (current_row == ties_row_ref)
++length;
else
{
ties_row_ref.reset();
break;
}
}
}
if (length == rows)
return *ptr;
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
return res;
return *ptr;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/SharedBlockRowRef.h>
namespace DB
@ -17,8 +18,13 @@ public:
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
* with_ties = true, when query has WITH TIES modifier. If so, description should be provided
* description lets us know which row we should check for equality
*/
LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false);
LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_,
bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false,
bool with_ties_ = false, const SortDescription & description_ = {});
String getName() const override { return "Limit"; }
@ -32,6 +38,9 @@ private:
UInt64 offset;
UInt64 pos = 0;
bool always_read_till_end;
bool with_ties;
const SortDescription description;
SharedBlockRowRef ties_row_ref;
};
}

View File

@ -5,6 +5,7 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <common/logger_useful.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/Row.h>
#include <Core/SortDescription.h>
@ -24,39 +25,6 @@ namespace ErrorCodes
}
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IBlockInputStream
@ -78,44 +46,6 @@ public:
Block getHeader() const override { return header; }
protected:
struct RowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num = 0;
SharedBlockPtr shared_block;
void swap(RowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const RowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const RowRef & other) const
{
return !(*this == other);
}
void reset()
{
RowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};
/// Simple class, which allows to check stop condition during merge process
/// in simple case it just compare amount of merged rows with max_block_size
/// in `count_average` case it compares amount of merged rows with linear combination
@ -148,7 +78,6 @@ protected:
}
};
Block readImpl() override;
void readSuffixImpl() override;
@ -230,7 +159,7 @@ protected:
}
template <typename TSortCursor>
void setRowRef(RowRef & row_ref, TSortCursor & cursor)
void setRowRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
@ -238,7 +167,7 @@ protected:
}
template <typename TSortCursor>
void setPrimaryKeyRef(RowRef & row_ref, TSortCursor & cursor)
void setPrimaryKeyRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];

View File

@ -41,11 +41,11 @@ private:
bool finished = false;
/// Primary key of current row.
RowRef current_key;
SharedBlockRowRef current_key;
/// Primary key of next row.
RowRef next_key;
SharedBlockRowRef next_key;
/// Last row with maximum version for current primary key.
RowRef selected_row;
SharedBlockRowRef selected_row;
/// The position (into current_row_sources) of the row with the highest version.
size_t max_pos = 0;

View File

@ -129,8 +129,8 @@ private:
std::vector<AggregateDescription> columns_to_aggregate;
std::vector<MapDescription> maps_to_sum;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.

View File

@ -47,7 +47,7 @@ void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size)
}
}
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns)
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns)
{
const auto & columns = row.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
@ -111,7 +111,7 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
SortCursor current = queue.top();
size_t current_block_granularity = current->rows;
RowRef next_key;
SharedBlockRowRef next_key;
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];

View File

@ -197,7 +197,7 @@ private:
Int8 sign_in_queue = 0;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
FixedSizeDequeWithGaps<SharedBlockRowRef> current_keys;
size_t blocks_written = 0;
@ -207,7 +207,7 @@ private:
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result row for the current primary key.
void insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns);
void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
void insertGap(size_t gap_size);
};

View File

@ -1,4 +1,5 @@
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#if USE_HDFS
#include <Common/Exception.h>
@ -11,8 +12,9 @@ extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str)
{
const Poco::URI uri(uri_str);
auto & host = uri.getHost();
auto port = uri.getPort();
auto & path = uri.getPath();

View File

@ -1,7 +1,6 @@
#include <Common/config.h>
#include <memory>
#include <type_traits>
#include <Poco/URI.h>
#if USE_HDFS
#include <hdfs/hdfs.h>
@ -27,12 +26,32 @@ struct HDFSFsDeleter
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo()
{
hdfsFreeFileInfo(file_info, length);
}
};
using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>;
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & hdfs_uri);
HDFSBuilderPtr createHDFSBuilder(const std::string & hdfs_uri);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -2,7 +2,6 @@
#if USE_HDFS
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#include <hdfs/hdfs.h>
@ -16,7 +15,7 @@ namespace ErrorCodes
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
{
Poco::URI hdfs_uri;
std::string hdfs_uri;
hdfsFile fin;
HDFSBuilderPtr builder;
HDFSFSPtr fs;
@ -26,8 +25,8 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
auto & path = hdfs_uri.getPath();
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
if (fin == nullptr)
@ -39,7 +38,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
{
int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0)
throw Exception("Fail to read HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_read;
}

View File

@ -2,7 +2,6 @@
#if USE_HDFS
#include <Poco/URI.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
@ -21,7 +20,7 @@ extern const int CANNOT_FSYNC;
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
Poco::URI hdfs_uri;
std::string hdfs_uri;
hdfsFile fout;
HDFSBuilderPtr builder;
HDFSFSPtr fs;
@ -31,7 +30,11 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
auto & path = hdfs_uri.getPath();
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
if (path.find("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0);
if (fout == nullptr)
@ -52,7 +55,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
int bytes_written = hdfsWrite(fs.get(), fout, start, size);
if (bytes_written < 0)
throw Exception("Fail to write HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_written;
}
@ -61,7 +64,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
int result = hdfsSync(fs.get(), fout);
if (result < 0)
throwFromErrno("Cannot HDFS sync" + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
throwFromErrno("Cannot HDFS sync" + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_FSYNC);
}
};

View File

@ -0,0 +1,127 @@
#include <Interpreters/FillingRow.h>
namespace DB
{
bool less(const Field & lhs, const Field & rhs, int direction)
{
if (direction == -1)
return applyVisitor(FieldVisitorAccurateLess(), rhs, lhs);
return applyVisitor(FieldVisitorAccurateLess(), lhs, rhs);
}
bool equals(const Field & lhs, const Field & rhs)
{
return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs);
}
FillingRow::FillingRow(const SortDescription & description_) : description(description_)
{
row.resize(description.size());
}
bool FillingRow::operator<(const FillingRow & other) const
{
for (size_t i = 0; i < size(); ++i)
{
if (row[i].isNull() || other[i].isNull() || equals(row[i], other[i]))
continue;
return less(row[i], other[i], getDirection(i));
}
return false;
}
bool FillingRow::operator==(const FillingRow & other) const
{
for (size_t i = 0; i < size(); ++i)
if (!equals(row[i], other[i]))
return false;
return true;
}
bool FillingRow::next(const FillingRow & to_row)
{
size_t pos = 0;
/// Find position we need to increment for generating next row.
for (; pos < row.size(); ++pos)
if (!row[pos].isNull() && !to_row[pos].isNull() && !equals(row[pos], to_row[pos]))
break;
if (pos == row.size() || less(to_row[pos], row[pos], getDirection(pos)))
return false;
/// If we have any 'fill_to' value at position greater than 'pos',
/// we need to generate rows up to 'fill_to' value.
for (size_t i = row.size() - 1; i > pos; --i)
{
if (getFillDescription(i).fill_to.isNull() || row[i].isNull())
continue;
auto next_value = row[i];
applyVisitor(FieldVisitorSum(getFillDescription(i).fill_step), next_value);
if (less(next_value, getFillDescription(i).fill_to, getDirection(i)))
{
row[i] = next_value;
initFromDefaults(i + 1);
return true;
}
}
auto next_value = row[pos];
applyVisitor(FieldVisitorSum(getFillDescription(pos).fill_step), next_value);
if (less(to_row[pos], next_value, getDirection(pos)))
return false;
row[pos] = next_value;
if (equals(row[pos], to_row[pos]))
{
bool is_less = false;
for (size_t i = pos + 1; i < size(); ++i)
{
const auto & fill_from = getFillDescription(i).fill_from;
if (!fill_from.isNull())
row[i] = fill_from;
else
row[i] = to_row[i];
is_less |= less(row[i], to_row[i], getDirection(i));
}
return is_less;
}
initFromDefaults(pos + 1);
return true;
}
void FillingRow::initFromDefaults(size_t from_pos)
{
for (size_t i = from_pos; i < row.size(); ++i)
row[i] = getFillDescription(i).fill_from;
}
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & other_columns, const FillingRow & filling_row)
{
for (size_t i = 0; i < filling_columns.size(); ++i)
{
if (filling_row[i].isNull())
filling_columns[i]->insertDefault();
else
filling_columns[i]->insert(filling_row[i]);
}
for (size_t i = 0; i < other_columns.size(); ++i)
other_columns[i]->insertDefault();
}
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0; i < source.size(); ++i)
dest[i]->insertFrom(*source[i], row_num);
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
#include <Common/FieldVisitors.h>
namespace DB
{
/// Compares fields in terms of sorting order, considering direction.
bool less(const Field & lhs, const Field & rhs, int direction);
bool equals(const Field & lhs, const Field & rhs);
/** Helps to implement modifier WITH FILL for ORDER BY clause.
* Stores row as array of fields and provides functions to generate next row for filling gaps and for comparing rows.
* Used in FillingBlockInputStream and in FillingTransform.
*/
class FillingRow
{
public:
FillingRow(const SortDescription & sort_description);
/// Generates next row according to fill 'from', 'to' and 'step' values.
bool next(const FillingRow & to_row);
void initFromDefaults(size_t from_pos = 0);
Field & operator[](size_t ind) { return row[ind]; }
const Field & operator[](size_t ind) const { return row[ind]; }
size_t size() const { return row.size(); }
bool operator<(const FillingRow & other) const;
bool operator==(const FillingRow & other) const;
int getDirection(size_t ind) const { return description[ind].direction; }
FillColumnDescription & getFillDescription(size_t ind) { return description[ind].fill_description; }
private:
std::vector<Field> row;
SortDescription description;
};
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & other_columns, const FillingRow & filling_row);
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num);
}

View File

@ -24,6 +24,7 @@
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -57,6 +58,7 @@
#include <Core/Field.h>
#include <Core/Types.h>
#include <Columns/Collator.h>
#include <Common/FieldVisitors.h>
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
#include <Parsers/queryToString.h>
@ -81,6 +83,7 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/FillingTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -103,6 +106,7 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int INVALID_LIMIT_EXPRESSION;
extern const int INVALID_WITH_FILL_EXPRESSION;
}
namespace
@ -681,8 +685,62 @@ InterpreterSelectQuery::analyzeExpressions(
return res;
}
static Field getWithFillFieldValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
static SortDescription getSortDescription(const ASTSelectQuery & query)
if (!isColumnedAsNumber(type))
throw Exception("Illegal type " + type->getName() + " of WITH FILL expression, must be numeric type", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
return field;
}
static FillColumnDescription getWithFillDescription(const ASTOrderByElement & order_by_elem, const Context & context)
{
FillColumnDescription descr;
if (order_by_elem.fill_from)
descr.fill_from = getWithFillFieldValue(order_by_elem.fill_from, context);
if (order_by_elem.fill_to)
descr.fill_to = getWithFillFieldValue(order_by_elem.fill_to, context);
if (order_by_elem.fill_step)
descr.fill_step = getWithFillFieldValue(order_by_elem.fill_step, context);
else
descr.fill_step = order_by_elem.direction;
if (applyVisitor(FieldVisitorAccurateEquals(), descr.fill_step, Field{0}))
throw Exception("WITH FILL STEP value cannot be zero", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (order_by_elem.direction == 1)
{
if (applyVisitor(FieldVisitorAccurateLess(), descr.fill_step, Field{0}))
throw Exception("WITH FILL STEP value cannot be negative for sorting in ascending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (!descr.fill_from.isNull() && !descr.fill_to.isNull() &&
applyVisitor(FieldVisitorAccurateLess(), descr.fill_to, descr.fill_from))
{
throw Exception("WITH FILL TO value cannot be less than FROM value for sorting in ascending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
}
else
{
if (applyVisitor(FieldVisitorAccurateLess(), Field{0}, descr.fill_step))
throw Exception("WITH FILL STEP value cannot be positive for sorting in descending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (!descr.fill_from.isNull() && !descr.fill_to.isNull() &&
applyVisitor(FieldVisitorAccurateLess(), descr.fill_from, descr.fill_to))
{
throw Exception("WITH FILL FROM value cannot be less than TO value for sorting in descending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
}
return descr;
}
static SortDescription getSortDescription(const ASTSelectQuery & query, const Context & context)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
@ -695,13 +753,19 @@ static SortDescription getSortDescription(const ASTSelectQuery & query)
if (order_by_elem.collation)
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
if (order_by_elem.with_fill)
{
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
order_descr.emplace_back(name, order_by_elem.direction,
order_by_elem.nulls_direction, collator, true, fill_desc);
}
else
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
}
return order_descr;
}
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
@ -736,7 +800,7 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
if (!query.distinct && !query.limitBy())
if (!query.distinct && !query.limitBy() && !query.limit_with_ties)
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
return limit_length + limit_offset;
@ -751,7 +815,7 @@ static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, cons
if (!merge_tree.hasSortingKey())
return {};
auto order_descr = getSortDescription(query);
auto order_descr = getSortDescription(query, context);
SortDescription prefix_order_descr;
int read_direction = order_descr.at(0).direction;
@ -1173,7 +1237,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of rows in each up to `offset + limit`.
*/
if (query.limitLength() && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
{
executePreLimit(pipeline);
}
@ -1206,6 +1270,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeLimitBy(pipeline);
}
executeWithFill(pipeline);
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeProjection(pipeline, expressions.final_projection);
@ -1222,7 +1288,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
}
template <typename TPipeline>
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
@ -1420,11 +1485,12 @@ void InterpreterSelectQuery::executeFetchColumns(
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size,
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY, WITH TIES but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
*/
if (!query.distinct
&& !query.limit_with_ties
&& !query.prewhere()
&& !query.where()
&& !query.groupBy()
@ -2007,7 +2073,7 @@ void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const E
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
const Settings & settings = context.getSettingsRef();
UInt64 limit = getLimitForSorting(query, context);
@ -2079,7 +2145,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoPtr sorting_info)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
@ -2160,7 +2226,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP
void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
@ -2187,7 +2253,7 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
@ -2234,7 +2300,7 @@ void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_or
UInt64 limit_for_distinct = 0;
/// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
if (!query.orderBy() || !before_order)
if ((!query.orderBy() || !before_order) && !query.limit_with_ties)
limit_for_distinct = limit_length + limit_offset;
pipeline.transform([&](auto & stream)
@ -2303,9 +2369,16 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
if (query.limitLength())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
SortDescription sort_descr;
if (query.limit_with_ties)
{
if (!query.orderBy())
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
sort_descr = getSortDescription(query, context);
}
pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false);
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false, false, query.limit_with_ties, sort_descr);
});
}
}
@ -2417,17 +2490,73 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
always_read_till_end = true;
SortDescription order_descr;
if (query.limit_with_ties)
{
if (!query.orderBy())
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
order_descr = getSortDescription(query, context);
}
UInt64 limit_length;
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end);
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end, false, query.limit_with_ties, order_descr);
});
}
}
void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline)
{
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query, context);
SortDescription fill_descr;
for (auto & desc : order_descr)
{
if (desc.with_fill)
fill_descr.push_back(desc);
}
if (fill_descr.empty())
return;
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FillingBlockInputStream>(stream, fill_descr);
});
}
}
void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline)
{
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query, context);
SortDescription fill_descr;
for (auto & desc : order_descr)
{
if (desc.with_fill)
fill_descr.push_back(desc);
}
if (fill_descr.empty())
return;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FillingTransform>(header, fill_descr);
});
}
}
void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
{
auto & query = getSelectQuery();
@ -2455,13 +2584,21 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
SortDescription order_descr;
if (query.limit_with_ties)
{
if (!query.orderBy())
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
order_descr = getSortDescription(query, context);
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<LimitTransform>(
header, limit_length, limit_offset, always_read_till_end);
header, limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr);
});
}
}

View File

@ -204,6 +204,7 @@ private:
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info);
void executeWithFill(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline, Block header); /// If header is not empty, convert streams structure to it.
@ -221,6 +222,7 @@ private:
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline, SortingInfoPtr sorting_info);
void executeWithFill(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline);
void executeLimitBy(QueryPipeline & pipeline);

View File

@ -25,6 +25,26 @@ void ASTOrderByElement::formatImpl(const FormatSettings & settings, FormatState
settings.ostr << (settings.hilite ? hilite_keyword : "") << " COLLATE " << (settings.hilite ? hilite_none : "");
collation->formatImpl(settings, state, frame);
}
if (with_fill)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH FILL " << (settings.hilite ? hilite_none : "");
if (fill_from)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : "");
fill_from->formatImpl(settings, state, frame);
}
if (fill_to)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "");
fill_to->formatImpl(settings, state, frame);
}
if (fill_step)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " STEP " << (settings.hilite ? hilite_none : "");
fill_step->formatImpl(settings, state, frame);
}
}
}
}

View File

@ -18,12 +18,22 @@ public:
/** Collation for locale-specific string comparison. If empty, then sorting done by bytes. */
ASTPtr collation;
bool with_fill;
ASTPtr fill_from;
ASTPtr fill_to;
ASTPtr fill_step;
ASTOrderByElement(
const int direction_, const int nulls_direction_, const bool nulls_direction_was_explicitly_specified_, ASTPtr & collation_)
const int direction_, const int nulls_direction_, const bool nulls_direction_was_explicitly_specified_,
ASTPtr & collation_, const bool with_fill_, ASTPtr & fill_from_, ASTPtr & fill_to_, ASTPtr & fill_step_)
: direction(direction_)
, nulls_direction(nulls_direction_)
, nulls_direction_was_explicitly_specified(nulls_direction_was_explicitly_specified_)
, collation(collation_)
, with_fill(with_fill_)
, fill_from(fill_from_)
, fill_to(fill_to_)
, fill_step(fill_step_)
{
}

View File

@ -148,6 +148,8 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
s.ostr << ", ";
}
limitLength()->formatImpl(s, state, frame);
if (limit_with_ties)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << " WITH TIES" << (s.hilite ? hilite_none : "");
}
if (settings())

View File

@ -42,6 +42,7 @@ public:
bool group_by_with_totals = false;
bool group_by_with_rollup = false;
bool group_by_with_cube = false;
bool limit_with_ties = false;
ASTPtr & refSelect() { return getExpression(Expression::SELECT); }
ASTPtr & refTables() { return getExpression(Expression::TABLES); }

View File

@ -1360,7 +1360,12 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ParserKeyword first("FIRST");
ParserKeyword last("LAST");
ParserKeyword collate("COLLATE");
ParserKeyword with_fill("WITH FILL");
ParserKeyword from("FROM");
ParserKeyword to("TO");
ParserKeyword step("STEP");
ParserStringLiteral collate_locale_parser;
ParserExpressionWithOptionalAlias exp_parser(false);
ASTPtr expr_elem;
if (!elem_p.parse(pos, expr_elem, expected))
@ -1395,7 +1400,27 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
return false;
}
node = std::make_shared<ASTOrderByElement>(direction, nulls_direction, nulls_direction_was_explicitly_specified, locale_node);
/// WITH FILL [FROM x] [TO y] [STEP z]
bool has_with_fill = false;
ASTPtr fill_from;
ASTPtr fill_to;
ASTPtr fill_step;
if (with_fill.ignore(pos))
{
has_with_fill = true;
if (from.ignore(pos) && !exp_parser.parse(pos, fill_from, expected))
return false;
if (to.ignore(pos) && !exp_parser.parse(pos, fill_to, expected))
return false;
if (step.ignore(pos) && !exp_parser.parse(pos, fill_step, expected))
return false;
}
node = std::make_shared<ASTOrderByElement>(
direction, nulls_direction, nulls_direction_was_explicitly_specified, locale_node,
has_with_fill, fill_from, fill_to, fill_step);
node->children.push_back(expr_elem);
if (locale_node)
node->children.push_back(locale_node);

View File

@ -289,6 +289,7 @@ protected:
/** Element of ORDER BY expression - same as expression element, but in addition, ASC[ENDING] | DESC[ENDING] could be specified
* and optionally, NULLS LAST|FIRST
* and optionally, COLLATE 'locale'.
* and optionally, WITH FILL [FROM x] [TO y] [STEP z]
*/
class ParserOrderByElement : public IParserBase
{

View File

@ -17,6 +17,7 @@ namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int TOP_AND_LIMIT_TOGETHER;
extern const int WITH_TIES_WITHOUT_ORDER_BY;
}
@ -41,6 +42,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_cube("CUBE");
ParserKeyword s_top("TOP");
ParserKeyword s_with_ties("WITH TIES");
ParserKeyword s_offset("OFFSET");
ParserNotEmptyExpressionList exp_list(false);
@ -76,7 +78,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
/// SELECT [DISTINCT] [TOP N] expr list
/// SELECT [DISTINCT] [TOP N [WITH TIES]] expr list
{
if (!s_select.ignore(pos, expected))
return false;
@ -100,6 +102,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!num.parse(pos, limit_length, expected))
return false;
}
if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
}
if (!exp_list_for_select_clause.parse(pos, select_expression_list, expected))
@ -197,12 +202,18 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
limit_offset = limit_length;
if (!exp_elem.parse(pos, limit_length, expected))
return false;
if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
}
else if (s_offset.ignore(pos, expected))
{
if (!exp_elem.parse(pos, limit_offset, expected))
return false;
}
else if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
if (s_by.ignore(pos, expected))
{
limit_by_length = limit_length;
@ -215,7 +226,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
/// LIMIT length | LIMIT offset, length
/// LIMIT length [WITH TIES] | LIMIT offset, length [WITH TIES]
if (s_limit.ignore(pos, expected))
{
if (!limit_by_length|| limit_length)
@ -237,8 +248,15 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!exp_elem.parse(pos, limit_offset, expected))
return false;
}
if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
}
/// WITH TIES was used without ORDER BY
if (!order_expression_list && select_query->limit_with_ties)
throw Exception("Can not use WITH TIES without ORDER BY", ErrorCodes::WITH_TIES_WITHOUT_ORDER_BY);
/// SETTINGS key1 = value1, key2 = value2, ...
if (s_settings.ignore(pos, expected))
{

View File

@ -6,19 +6,26 @@ namespace DB
LimitTransform::LimitTransform(
const Block & header_, size_t limit_, size_t offset_,
bool always_read_till_end_)
bool always_read_till_end_, bool with_ties_,
const SortDescription & description_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, limit(limit_), offset(offset_)
, always_read_till_end(always_read_till_end_)
, with_ties(with_ties_), description(description_)
{
for (const auto & desc : description)
{
if (!desc.column_name.empty())
sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
else
sort_column_positions.push_back(desc.column_number);
}
}
LimitTransform::Status LimitTransform::prepare()
{
/// Check can output.
bool output_finished = false;
if (output.isFinished())
@ -46,7 +53,7 @@ LimitTransform::Status LimitTransform::prepare()
}
/// Check if we are done with pushing.
bool pushing_is_finished = rows_read >= offset + limit;
bool pushing_is_finished = (rows_read >= offset + limit) && ties_row_ref.empty();
if (pushing_is_finished)
{
if (!always_read_till_end)
@ -116,6 +123,13 @@ LimitTransform::Status LimitTransform::prepare()
if (output.hasData())
return Status::PortFull;
if (with_ties && rows_read == offset + limit)
{
SharedChunkPtr shared_chunk = new detail::SharedChunk(current_chunk.clone());
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, shared_chunk->getNumRows() - 1);
}
output.push(std::move(current_chunk));
has_block = false;
@ -132,8 +146,39 @@ LimitTransform::Status LimitTransform::prepare()
void LimitTransform::work()
{
size_t num_rows = current_chunk.getNumRows();
size_t num_columns = current_chunk.getNumColumns();
SharedChunkPtr shared_chunk = new detail::SharedChunk(std::move(current_chunk));
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
size_t num_rows = shared_chunk->getNumRows();
size_t num_columns = shared_chunk->getNumColumns();
if (!ties_row_ref.empty() && rows_read >= offset + limit)
{
UInt64 len;
for (len = 0; len < num_rows; ++len)
{
SharedChunkRowRef current_row;
current_row.set(shared_chunk, &shared_chunk->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;
}
}
auto columns = shared_chunk->detachColumns();
if (len < num_rows)
{
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(0, len);
}
current_chunk.setColumns(std::move(columns), len);
block_processed = true;
return;
}
/// return a piece of the block
size_t start = std::max(
@ -145,7 +190,33 @@ void LimitTransform::work()
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
auto columns = current_chunk.detachColumns();
/// check if other rows in current block equals to last one in limit
if (with_ties)
{
ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, start + length - 1);
SharedChunkRowRef current_row;
for (size_t i = ties_row_ref.row_num + 1; i < num_rows; ++i)
{
current_row.set(shared_chunk, &shared_chunk->sort_columns, i);
if (current_row == ties_row_ref)
++length;
else
{
ties_row_ref.reset();
break;
}
}
}
if (length == num_rows)
{
current_chunk = std::move(*shared_chunk);
block_processed = true;
return;
}
auto columns = shared_chunk->detachColumns();
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(start, length);
@ -155,5 +226,15 @@ void LimitTransform::work()
block_processed = true;
}
ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns)
{
ColumnRawPtrs res;
res.reserve(description.size());
for (size_t pos : sort_column_positions)
res.push_back(columns[pos].get());
return res;
}
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/SharedChunk.h>
#include <Core/SortDescription.h>
namespace DB
{
@ -23,10 +24,18 @@ private:
UInt64 rows_before_limit_at_least = 0;
bool with_ties;
const SortDescription description;
SharedChunkRowRef ties_row_ref;
std::vector<size_t> sort_column_positions;
ColumnRawPtrs extractSortColumns(const Columns & columns);
public:
LimitTransform(
const Block & header_, size_t limit_, size_t offset_,
bool always_read_till_end_ = false);
bool always_read_till_end_ = false, bool with_ties_ = false,
const SortDescription & description_ = {});
String getName() const override { return "Limit"; }

View File

@ -0,0 +1,91 @@
#pragma once
#include <algorithm>
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
}
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
struct SharedChunkRowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedChunkPtr shared_block;
void swap(SharedChunkRowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const SharedChunkRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const SharedChunkRowRef & other) const
{
return !(*this == other);
}
void reset()
{
SharedChunkRowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
void set(SharedChunkPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
}
};
}

View File

@ -0,0 +1,201 @@
#include <Processors/Transforms/FillingTransform.h>
#include <Interpreters/convertFieldToType.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
}
FillingTransform::FillingTransform(
const Block & header_, const SortDescription & sort_description_)
: ISimpleTransform(header_, header_, true)
, sort_description(sort_description_)
, filling_row(sort_description_)
, next_row(sort_description_)
{
std::vector<bool> is_fill_column(header_.columns());
for (const auto & elem : sort_description)
is_fill_column[header_.getPositionByName(elem.column_name)] = true;
auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
{
auto max_type = Field::Types::Null;
WhichDataType which(type);
DataTypePtr to_type;
if (isInteger(type) || which.isDateOrDateTime())
{
max_type = Field::Types::Int64;
to_type = std::make_shared<DataTypeInt64>();
}
else if (which.isFloat())
{
max_type = Field::Types::Float64;
to_type = std::make_shared<DataTypeFloat64>();
}
if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
|| descr.fill_step.getType() > max_type)
return false;
descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
return true;
};
for (size_t i = 0; i < header_.columns(); ++i)
{
if (is_fill_column[i])
{
size_t pos = fill_column_positions.size();
auto & descr = filling_row.getFillDescription(pos);
auto type = header_.getByPosition(i).type;
if (!try_convert_fields(descr, type))
throw Exception("Incompatible types of WITH FILL expression values with column type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (type->isValueRepresentedByUnsignedInteger() &&
((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
(!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
{
throw Exception("WITH FILL bound values cannot be negative for unsigned type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
fill_column_positions.push_back(i);
}
else
other_column_positions.push_back(i);
}
}
IProcessor::Status FillingTransform::prepare()
{
if (input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
{
should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
if (filling_row < next_row)
{
generate_suffix = true;
return Status::Ready;
}
}
return ISimpleTransform::prepare();
}
void FillingTransform::transform(Chunk & chunk)
{
Columns old_fill_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_other_columns;
auto init_columns_by_positions = [](const Columns & old_columns, Columns & new_columns,
MutableColumns & new_mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
new_columns.push_back(old_columns[pos]);
new_mutable_columns.push_back(old_columns[pos]->cloneEmpty()->assumeMutable());
}
};
if (generate_suffix)
{
const auto & empty_columns = inputs.front().getHeader().getColumns();
init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions);
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
setResultColumns(chunk, res_fill_columns, res_other_columns);
return;
}
size_t num_rows = chunk.getNumRows();
auto old_columns = chunk.detachColumns();
init_columns_by_positions(old_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(old_columns, old_other_columns, res_other_columns, other_column_positions);
if (first)
{
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
break;
}
filling_row[i] = current_value;
}
first = false;
}
for (size_t row_ind = 0; row_ind < num_rows; ++row_ind)
{
should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
/// Insert generated filling row to block, while it is less than current row in block.
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
setResultColumns(chunk, res_fill_columns, res_other_columns);
}
void FillingTransform::setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(fill_columns.size() + other_columns.size());
/// fill_columns always non-empty.
size_t num_rows = fill_columns[0]->size();
for (size_t i = 0; i < fill_columns.size(); ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0; i < other_columns.size(); ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
chunk.setColumns(std::move(result_columns), num_rows);
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Core/SortDescription.h>
#include <Interpreters/FillingRow.h>
namespace DB
{
/** Implements modifier WITH FILL of ORDER BY clause.
* It fills gaps in data stream by rows with missing values in columns with set WITH FILL and deafult values in other columns.
* Optionally FROM, TO and STEP values can be specified.
*/
class FillingTransform : public ISimpleTransform
{
public:
FillingTransform(const Block & header_, const SortDescription & fill_description_);
String getName() const override { return "FillingTransform"; }
Status prepare() override;
protected:
void transform(Chunk & Chunk) override;
private:
void setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & other_columns) const;
const SortDescription sort_description; /// Contains only rows with WITH FILL.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
using Positions = std::vector<size_t>;
Positions fill_column_positions;
Positions other_column_positions;
bool first = true;
bool generate_suffix = false;
/// Determines should we insert filling row before start generating next rows.
bool should_insert_first = false;
};
}

View File

@ -2,46 +2,13 @@
#include <Processors/IProcessor.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <Processors/SharedChunk.h>
#include <queue>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
class MergingSortedTransform : public IProcessor
{
public:

View File

@ -18,12 +18,19 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/parseGlobs.h>
#include <fcntl.h>
#include <Poco/Path.h>
#include <Poco/File.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
@ -39,6 +46,54 @@ namespace ErrorCodes
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
}
namespace
{
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
std::vector<std::string> LSWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
std::vector<std::string> result;
const std::string prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs);
if (!fs::exists(fs::path(prefix_without_globs.data())))
{
return result;
}
const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
/// Condition is_directory means what kind of path is it in current iteration of ls
if (!fs::is_directory(it->path()) && !looking_for_directory)
{
if (re2::RE2::FullMatch(file_name, matcher))
{
result.push_back(it->path().string());
}
}
else if (fs::is_directory(it->path()) && looking_for_directory)
{
if (re2::RE2::FullMatch(file_name, matcher))
{
Strings result_part = LSWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash));
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
}
return result;
}
}
static std::string getTablePath(const std::string & db_dir_path, const std::string & table_name, const std::string & format_name)
{
@ -90,8 +145,10 @@ StorageFile::StorageFile(
if (poco_path.isRelative())
poco_path = Poco::Path(db_dir_path, poco_path);
path = poco_path.absolute().toString();
checkCreationIsAllowed(context_global, db_dir_path, path, table_fd);
const std::string path = poco_path.absolute().toString();
paths = LSWithRegexpMatching("/", path);
for (const auto & cur_path : paths)
checkCreationIsAllowed(context_global, db_dir_path, cur_path, table_fd);
is_db_table = false;
}
else /// Is DB's file
@ -99,14 +156,18 @@ StorageFile::StorageFile(
if (db_dir_path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
path = getTablePath(db_dir_path, table_name, format_name);
paths = {getTablePath(db_dir_path, table_name, format_name)};
is_db_table = true;
Poco::File(Poco::Path(path).parent()).createDirectories();
Poco::File(Poco::Path(paths.back()).parent()).createDirectories();
}
}
else /// Will use FD
{
checkCreationIsAllowed(context_global, db_dir_path, path, table_fd);
if (paths.size() != 1)
throw Exception("Table '" + table_name + "' is in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
checkCreationIsAllowed(context_global, db_dir_path, paths[0], table_fd);
is_db_table = false;
use_table_fd = true;
@ -121,7 +182,7 @@ StorageFile::StorageFile(
class StorageFileBlockInputStream : public IBlockInputStream
{
public:
StorageFileBlockInputStream(StorageFile & storage_, const Context & context, UInt64 max_block_size)
StorageFileBlockInputStream(StorageFile & storage_, const Context & context, UInt64 max_block_size, std::string file_path)
: storage(storage_)
{
if (storage.use_table_fd)
@ -147,8 +208,7 @@ public:
else
{
shared_lock = std::shared_lock(storage.rwlock);
read_buf = std::make_unique<ReadBufferFromFile>(storage.path);
read_buf = std::make_unique<ReadBufferFromFile>(file_path);
}
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
@ -195,12 +255,16 @@ BlockInputStreams StorageFile::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
BlockInputStreamPtr block_input = std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size);
const ColumnsDescription & columns_ = getColumns();
auto column_defaults = columns_.getDefaults();
if (column_defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
BlockInputStreams blocks_input;
blocks_input.reserve(paths.size());
for (const auto & file_path : paths)
{
BlockInputStreamPtr cur_block = std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size, file_path);
blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
}
return blocks_input;
}
@ -210,6 +274,8 @@ public:
explicit StorageFileBlockOutputStream(StorageFile & storage_)
: storage(storage_), lock(storage.rwlock)
{
if (storage.paths.size() != 1)
throw Exception("Table '" + storage.table_name + "' is in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
if (storage.use_table_fd)
{
/** NOTE: Using real file binded to FD may be misleading:
@ -221,7 +287,7 @@ public:
}
else
{
write_buf = std::make_unique<WriteBufferFromFile>(storage.path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
write_buf = std::make_unique<WriteBufferFromFile>(storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
}
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), storage.context_global);
@ -263,19 +329,28 @@ BlockOutputStreamPtr StorageFile::write(
return std::make_shared<StorageFileBlockOutputStream>(*this);
}
String StorageFile::getDataPath() const
{
if (paths.empty())
throw Exception("Table '" + table_name + "' is in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
return paths[0];
}
void StorageFile::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
if (!is_db_table)
throw Exception("Can't rename table '" + table_name + "' binded to user-defined file (or FD)", ErrorCodes::DATABASE_ACCESS_DENIED);
if (paths.size() != 1)
throw Exception("Can't rename table '" + table_name + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
std::unique_lock<std::shared_mutex> lock(rwlock);
std::string path_new = getTablePath(new_path_to_db, new_table_name, format_name);
Poco::File(Poco::Path(path_new).parent()).createDirectories();
Poco::File(path).renameTo(path_new);
Poco::File(paths[0]).renameTo(path_new);
path = std::move(path_new);
paths[0] = std::move(path_new);
table_name = new_table_name;
database_name = new_database_name;
}

View File

@ -40,7 +40,7 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
String getDataPath() const override { return path; }
String getDataPath() const override;
protected:
friend class StorageFileBlockInputStream;
@ -68,9 +68,10 @@ private:
std::string format_name;
Context & context_global;
std::string path;
int table_fd = -1;
std::vector<std::string> paths{""};
bool is_db_table = true; /// Table is stored in real database, not user's file
bool use_table_fd = false; /// Use table_fd insted of path
std::atomic<bool> table_fd_was_used{false}; /// To detect repeating reads from stdin

View File

@ -9,12 +9,17 @@
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Common/parseGlobs.h>
#include <Poco/URI.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
namespace DB
{
@ -129,6 +134,51 @@ private:
BlockOutputStreamPtr writer;
};
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/'
const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);
Strings result;
for (int i = 0; i < ls.length; ++i)
{
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
const bool is_directory = ls.file_info[i].mKind == 'D';
/// Condition with type of current file_info means what kind of path is it in current iteration of ls
if (!is_directory && !looking_for_directory)
{
if (re2::RE2::FullMatch(file_name, matcher))
{
result.push_back(String(ls.file_info[i].mName));
}
}
else if (is_directory && looking_for_directory)
{
if (re2::RE2::FullMatch(file_name, matcher))
{
Strings result_part = LSWithRegexpMatching(full_path + "/", fs, suffix_with_globs.substr(next_slash));
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
}
return result;
}
}
@ -140,12 +190,22 @@ BlockInputStreams StorageHDFS::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
return {std::make_shared<HDFSBlockInputStream>(
uri,
format_name,
getSampleBlock(),
context_,
max_block_size)};
const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path);
HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
HDFSFSPtr fs = createHDFSFS(builder.get());
const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri);
BlockInputStreams result;
for (const auto & res_path : res_paths)
{
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, format_name, getSampleBlock(), context_,
max_block_size));
}
return result;
}
void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)

View File

@ -0,0 +1,115 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node')
path_to_userfiles_from_defaut_config = "/var/lib/clickhouse/user_files/" # should be the same as in config file
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_strange_filenames(start_cluster):
# 2 rows data
some_data = "\t111.222\nData\t333.444"
node.exec_in_container(['bash', '-c', 'mkdir {}strange_names/'.format(path_to_userfiles_from_defaut_config)], privileged=True, user='root')
files = ["p.o.i.n.t.s",
"b}{ra{ces",
"b}.o{t.h"]
# filename inside testing data for debug simplicity
for filename in files:
node.exec_in_container(['bash', '-c', 'echo "{}{}" > {}strange_names/{}'.format(filename, some_data, path_to_userfiles_from_defaut_config, filename)], privileged=True, user='root')
test_requests = [("p.o.??n.t.s", "2"),
("p.o.*t.s", "2"),
("b}{r?{ces", "2"),
("b}*ces", "2"),
("b}.?{t.h", "2")]
for pattern, value in test_requests:
assert node.query('''
select count(*) from file('strange_names/{}', 'TSV', 'text String, number Float64')
'''.format(pattern)) == '{}\n'.format(value)
assert node.query('''
select count(*) from file('{}strange_names/{}', 'TSV', 'text String, number Float64')
'''.format(path_to_userfiles_from_defaut_config, pattern)) == '{}\n'.format(value)
def test_linear_structure(start_cluster):
# 2 rows data
some_data = "\t123.456\nData\t789.012"
files = ["file1", "file2", "file3", "file4", "file5",
"file000", "file111", "file222", "file333", "file444",
"a_file", "b_file", "c_file", "d_file", "e_file",
"a_data", "b_data", "c_data", "d_data", "e_data"]
# filename inside testing data for debug simplicity
for filename in files:
node.exec_in_container(['bash', '-c', 'echo "{}{}" > {}{}'.format(filename, some_data, path_to_userfiles_from_defaut_config, filename)], privileged=True, user='root')
test_requests = [("file{0..9}", "10"),
("file?", "10"),
("file{0..9}{0..9}{0..9}", "10"),
("file???", "10"),
("file*", "20"),
("a_{file,data}", "4"),
("?_{file,data}", "20"),
("{a,b,c,d,e}_{file,data}", "20"),
("{a,b,c,d,e}?{file,data}", "20"),
("*", "40")]
for pattern, value in test_requests:
assert node.query('''
select count(*) from file('{}', 'TSV', 'text String, number Float64')
'''.format(pattern)) == '{}\n'.format(value)
assert node.query('''
select count(*) from file('{}{}', 'TSV', 'text String, number Float64')
'''.format(path_to_userfiles_from_defaut_config, pattern)) == '{}\n'.format(value)
def test_deep_structure(start_cluster):
# 2 rows data
some_data = "\t135.791\nData\t246.802"
dirs = ["directory1/", "directory2/", "some_more_dir/", "we/",
"directory1/big_dir/",
"directory1/dir1/", "directory1/dir2/", "directory1/dir3/",
"directory2/dir1/", "directory2/dir2/", "directory2/one_more_dir/",
"some_more_dir/yet_another_dir/",
"we/need/", "we/need/to/", "we/need/to/go/", "we/need/to/go/deeper/"]
for dir in dirs:
node.exec_in_container(['bash', '-c', 'mkdir {}{}'.format(path_to_userfiles_from_defaut_config, dir)], privileged=True, user='root')
# all directories appeared in files must be listed in dirs
files = []
for i in range(10):
for j in range(10):
for k in range(10):
files.append("directory1/big_dir/file"+str(i)+str(j)+str(k))
for dir in dirs:
files.append(dir+"file")
# filename inside testing data for debug simplicity
for filename in files:
node.exec_in_container(['bash', '-c', 'echo "{}{}" > {}{}'.format(filename, some_data, path_to_userfiles_from_defaut_config, filename)], privileged=True, user='root')
test_requests = [ ("directory{1..5}/big_dir/*", "2002"), ("directory{0..6}/big_dir/*{0..9}{0..9}{0..9}", "2000"),
("?", "0"),
("directory{0..5}/dir{1..3}/file", "10"), ("directory{0..5}/dir?/file", "10"),
("we/need/to/go/deeper/file", "2"), ("*/*/*/*/*/*", "2"), ("we/need/??/go/deeper/*?*?*?*?*", "2")]
for pattern, value in test_requests:
assert node.query('''
select count(*) from file('{}', 'TSV', 'text String, number Float64')
'''.format(pattern)) == '{}\n'.format(value)
assert node.query('''
select count(*) from file('{}{}', 'TSV', 'text String, number Float64')
'''.format(path_to_userfiles_from_defaut_config, pattern)) == '{}\n'.format(value)

View File

@ -28,15 +28,31 @@ def started_cluster():
cluster.shutdown()
def test_read_write_storage(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = HDFSApi("root")
for i in ["1", "2", "3"]:
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
node1.query("create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')")
node1.query("create table HDFSStorageWithEnum (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1,2,3,4,5}', 'TSV')")
node1.query("create table HDFSStorageWithQuestionMark (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage?', 'TSV')")
node1.query("create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')")
assert node1.query("select count(*) from HDFSStorageWithRange") == '3\n'
assert node1.query("select count(*) from HDFSStorageWithEnum") == '3\n'
assert node1.query("select count(*) from HDFSStorageWithQuestionMark") == '3\n'
assert node1.query("select count(*) from HDFSStorageWithAsterisk") == '3\n'
def test_read_write_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
@ -74,3 +90,27 @@ def test_bad_hdfs_uri(started_cluster):
except Exception as ex:
print ex
assert 'Unable to open HDFS file' in str(ex)
def test_globs_in_read_table(started_cluster):
hdfs_api = HDFSApi("root")
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
globs_dir = "/dir_for_test_with_globs/"
files = ["dir1/dir_dir/file1", "dir2/file2", "simple_table_function", "dir/file", "some_dir/dir1/file", "some_dir/dir2/file", "some_dir/file", "table1_function", "table2_function", "table3_function"]
for filename in files:
hdfs_api.write_data(globs_dir + filename, some_data)
test_requests = [("dir{1..5}/dir_dir/file1", 1),
("*_table_functio?", 1),
("dir/fil?", 1),
("table{3..8}_function", 1),
("table{2..8}_function", 2),
("dir/*", 1),
("dir/*?*?*?*?*", 1),
("dir/*?*?*?*?*?*", 0),
("some_dir/*/file", 2),
("some_dir/dir?/*", 2),
("*/*/*", 3),
("?", 0)]
for pattern, value in test_requests:
assert node1.query("select * from hdfs('hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64')") == value * some_data

View File

@ -0,0 +1,510 @@
*** table without fill to compare ***
2019-05-07 18 prh
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 1 myj
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-15 27 enb
2019-05-19 20 yfh
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 17 0ei
2019-05-30 18 3kd
2019-06-04 5 6az
*** date WITH FILL, val ***
2019-05-07 18 prh
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 1 myj
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-12 0
2019-05-13 0
2019-05-14 0
2019-05-15 27 enb
2019-05-16 0
2019-05-17 0
2019-05-18 0
2019-05-19 20 yfh
2019-05-20 0
2019-05-21 0
2019-05-22 0
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 17 0ei
2019-05-26 0
2019-05-27 0
2019-05-28 0
2019-05-29 0
2019-05-30 18 3kd
2019-05-31 0
2019-06-01 0
2019-06-02 0
2019-06-03 0
2019-06-04 5 6az
*** date WITH FILL FROM 2019-05-01 TO 2019-05-31, val WITH FILL ***
2019-05-01 0
2019-05-02 0
2019-05-03 0
2019-05-04 0
2019-05-05 0
2019-05-06 0
2019-05-07 18 prh
2019-05-07 19
2019-05-07 20
2019-05-07 21
2019-05-07 22
2019-05-07 23
2019-05-07 24
2019-05-07 25
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 1 myj
2019-05-10 2
2019-05-10 3
2019-05-10 4
2019-05-10 5
2019-05-10 6
2019-05-10 7
2019-05-10 8
2019-05-10 9
2019-05-10 10
2019-05-10 11
2019-05-10 12
2019-05-10 13
2019-05-10 14
2019-05-10 15
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-12 0
2019-05-13 0
2019-05-14 0
2019-05-15 27 enb
2019-05-16 0
2019-05-17 0
2019-05-18 0
2019-05-19 20 yfh
2019-05-20 0
2019-05-21 0
2019-05-22 0
2019-05-23 15 01v
2019-05-23 16
2019-05-23 17
2019-05-23 18
2019-05-23 19
2019-05-23 20
2019-05-23 21
2019-05-23 22
2019-05-23 23
2019-05-23 24
2019-05-23 25
2019-05-23 26
2019-05-23 27
2019-05-23 28
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 17 0ei
2019-05-26 0
2019-05-27 0
2019-05-28 0
2019-05-29 0
2019-05-30 18 3kd
2019-06-04 5 6az
*** date DESC WITH FILL, val WITH FILL FROM 1 TO 6 ***
2019-06-04 1
2019-06-04 2
2019-06-04 3
2019-06-04 4
2019-06-04 5 6az
2019-06-03 1
2019-06-03 2
2019-06-03 3
2019-06-03 4
2019-06-03 5
2019-06-02 1
2019-06-02 2
2019-06-02 3
2019-06-02 4
2019-06-02 5
2019-06-01 1
2019-06-01 2
2019-06-01 3
2019-06-01 4
2019-06-01 5
2019-05-31 1
2019-05-31 2
2019-05-31 3
2019-05-31 4
2019-05-31 5
2019-05-30 1
2019-05-30 2
2019-05-30 3
2019-05-30 4
2019-05-30 5
2019-05-30 18 3kd
2019-05-29 1
2019-05-29 2
2019-05-29 3
2019-05-29 4
2019-05-29 5
2019-05-28 1
2019-05-28 2
2019-05-28 3
2019-05-28 4
2019-05-28 5
2019-05-27 1
2019-05-27 2
2019-05-27 3
2019-05-27 4
2019-05-27 5
2019-05-26 1
2019-05-26 2
2019-05-26 3
2019-05-26 4
2019-05-26 5
2019-05-25 1
2019-05-25 2
2019-05-25 3
2019-05-25 4
2019-05-25 5
2019-05-25 17 0ei
2019-05-24 1
2019-05-24 2
2019-05-24 3
2019-05-24 4
2019-05-24 5
2019-05-24 13 sd0
2019-05-23 1
2019-05-23 2
2019-05-23 3
2019-05-23 4
2019-05-23 5
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-22 1
2019-05-22 2
2019-05-22 3
2019-05-22 4
2019-05-22 5
2019-05-21 1
2019-05-21 2
2019-05-21 3
2019-05-21 4
2019-05-21 5
2019-05-20 1
2019-05-20 2
2019-05-20 3
2019-05-20 4
2019-05-20 5
2019-05-19 1
2019-05-19 2
2019-05-19 3
2019-05-19 4
2019-05-19 5
2019-05-19 20 yfh
2019-05-18 1
2019-05-18 2
2019-05-18 3
2019-05-18 4
2019-05-18 5
2019-05-17 1
2019-05-17 2
2019-05-17 3
2019-05-17 4
2019-05-17 5
2019-05-16 1
2019-05-16 2
2019-05-16 3
2019-05-16 4
2019-05-16 5
2019-05-15 1
2019-05-15 2
2019-05-15 3
2019-05-15 4
2019-05-15 5
2019-05-15 27 enb
2019-05-14 1
2019-05-14 2
2019-05-14 3
2019-05-14 4
2019-05-14 5
2019-05-13 1
2019-05-13 2
2019-05-13 3
2019-05-13 4
2019-05-13 5
2019-05-12 1
2019-05-12 2
2019-05-12 3
2019-05-12 4
2019-05-12 5
2019-05-11 1
2019-05-11 2
2019-05-11 3
2019-05-11 4
2019-05-11 5
2019-05-11 18 3s2
2019-05-10 1 myj
2019-05-10 2
2019-05-10 3
2019-05-10 4
2019-05-10 5
2019-05-10 16 vp7
2019-05-09 1
2019-05-09 2
2019-05-09 3
2019-05-09 4
2019-05-09 5
2019-05-09 25 798
2019-05-08 1
2019-05-08 2
2019-05-08 3
2019-05-08 4
2019-05-08 5
2019-05-08 28 otf
2019-05-07 1
2019-05-07 2
2019-05-07 3
2019-05-07 4
2019-05-07 5
2019-05-07 18 prh
2019-05-07 26 2ke
*** date DESC WITH FILL TO 2019-05-01 STEP -2, val DESC WITH FILL FROM 10 TO -5 STEP -3 ***
2019-06-04 10
2019-06-04 7
2019-06-04 5 6az
2019-06-04 4
2019-06-04 1
2019-06-04 -2
2019-06-02 10
2019-06-02 7
2019-06-02 4
2019-06-02 1
2019-06-02 -2
2019-05-31 10
2019-05-31 7
2019-05-31 4
2019-05-31 1
2019-05-31 -2
2019-05-30 18 3kd
2019-05-29 10
2019-05-29 7
2019-05-29 4
2019-05-29 1
2019-05-29 -2
2019-05-27 10
2019-05-27 7
2019-05-27 4
2019-05-27 1
2019-05-27 -2
2019-05-25 17 0ei
2019-05-25 10
2019-05-25 7
2019-05-25 4
2019-05-25 1
2019-05-25 -2
2019-05-24 13 sd0
2019-05-23 29 72y
2019-05-23 15 01v
2019-05-23 10
2019-05-23 7
2019-05-23 4
2019-05-23 1
2019-05-23 -2
2019-05-21 10
2019-05-21 7
2019-05-21 4
2019-05-21 1
2019-05-21 -2
2019-05-19 20 yfh
2019-05-19 10
2019-05-19 7
2019-05-19 4
2019-05-19 1
2019-05-19 -2
2019-05-17 10
2019-05-17 7
2019-05-17 4
2019-05-17 1
2019-05-17 -2
2019-05-15 27 enb
2019-05-15 10
2019-05-15 7
2019-05-15 4
2019-05-15 1
2019-05-15 -2
2019-05-13 10
2019-05-13 7
2019-05-13 4
2019-05-13 1
2019-05-13 -2
2019-05-11 18 3s2
2019-05-11 10
2019-05-11 7
2019-05-11 4
2019-05-11 1
2019-05-11 -2
2019-05-10 16 vp7
2019-05-10 1 myj
2019-05-09 25 798
2019-05-09 10
2019-05-09 7
2019-05-09 4
2019-05-09 1
2019-05-09 -2
2019-05-08 28 otf
2019-05-07 26 2ke
2019-05-07 18 prh
2019-05-07 10
2019-05-07 7
2019-05-07 4
2019-05-07 1
2019-05-07 -2
2019-05-05 10
2019-05-05 7
2019-05-05 4
2019-05-05 1
2019-05-05 -2
2019-05-03 10
2019-05-03 7
2019-05-03 4
2019-05-03 1
2019-05-03 -2
2019-05-01 10
2019-05-01 7
2019-05-01 4
2019-05-01 1
2019-05-01 -2
*** date WITH FILL TO 2019-06-23 STEP 3, val WITH FILL FROM -10 STEP 2
2019-05-07 -10
2019-05-07 -8
2019-05-07 -6
2019-05-07 -4
2019-05-07 -2
2019-05-07 0
2019-05-07 2
2019-05-07 4
2019-05-07 6
2019-05-07 8
2019-05-07 10
2019-05-07 12
2019-05-07 14
2019-05-07 16
2019-05-07 18 prh
2019-05-07 20
2019-05-07 22
2019-05-07 24
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 -10
2019-05-10 -8
2019-05-10 -6
2019-05-10 -4
2019-05-10 -2
2019-05-10 0
2019-05-10 1 myj
2019-05-10 2
2019-05-10 4
2019-05-10 6
2019-05-10 8
2019-05-10 10
2019-05-10 12
2019-05-10 14
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-13 -10
2019-05-15 27 enb
2019-05-16 -10
2019-05-19 -10
2019-05-19 -8
2019-05-19 -6
2019-05-19 -4
2019-05-19 -2
2019-05-19 0
2019-05-19 2
2019-05-19 4
2019-05-19 6
2019-05-19 8
2019-05-19 10
2019-05-19 12
2019-05-19 14
2019-05-19 16
2019-05-19 18
2019-05-19 20 yfh
2019-05-22 -10
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 -10
2019-05-25 -8
2019-05-25 -6
2019-05-25 -4
2019-05-25 -2
2019-05-25 0
2019-05-25 2
2019-05-25 4
2019-05-25 6
2019-05-25 8
2019-05-25 10
2019-05-25 12
2019-05-25 14
2019-05-25 16
2019-05-25 17 0ei
2019-05-28 -10
2019-05-30 18 3kd
2019-05-31 -10
2019-06-03 -10
2019-06-04 5 6az
2019-06-06 -10
2019-06-09 -10
2019-06-12 -10
2019-06-15 -10
2019-06-18 -10
2019-06-21 -10
*** table without fill to compare ***
1 -2
1 3
3 2
5 -1
6 5
8 0
*** a WITH FILL, b WITH fill ***
1 -2
1 -1
1 0
1 1
1 2
1 3
2 0
3 2
4 0
5 -1
6 5
7 0
8 0
*** a WITH FILL, b WITH fill TO 6 STEP 2 ***
1 -2
1 0
1 2
1 3
1 4
2 0
3 2
3 4
4 0
5 -1
5 1
5 3
5 5
6 5
7 0
8 0
8 2
8 4

View File

@ -0,0 +1,45 @@
DROP TABLE IF EXISTS fill;
CREATE TABLE fill (date Date, val Int, str String) ENGINE = Memory;
INSERT INTO fill VALUES (toDate('2019-05-24'), 13, 'sd0')(toDate('2019-05-10'), 16, 'vp7')(toDate('2019-05-25'), 17, '0ei')(toDate('2019-05-30'), 18, '3kd')(toDate('2019-05-15'), 27, 'enb')(toDate('2019-06-04'), 5, '6az')(toDate('2019-05-23'), 15, '01v')(toDate('2019-05-08'), 28, 'otf')(toDate('2019-05-19'), 20, 'yfh')(toDate('2019-05-07'), 26, '2ke')(toDate('2019-05-07'), 18, 'prh')(toDate('2019-05-09'), 25, '798')(toDate('2019-05-10'), 1, 'myj')(toDate('2019-05-11'), 18, '3s2')(toDate('2019-05-23'), 29, '72y');
SELECT '*** table without fill to compare ***';
SELECT * FROM fill ORDER BY date, val;
-- Some useful cases
SELECT '*** date WITH FILL, val ***';
SELECT * FROM fill ORDER BY date WITH FILL, val;
SELECT '*** date WITH FILL FROM 2019-05-01 TO 2019-05-31, val WITH FILL ***';
SELECT * FROM fill ORDER BY date WITH FILL FROM toDate('2019-05-01') TO toDate('2019-05-31'), val WITH FILL;
SELECT '*** date DESC WITH FILL, val WITH FILL FROM 1 TO 6 ***';
SELECT * FROM fill ORDER BY date DESC WITH FILL, val WITH FILL FROM 1 TO 6;
-- Some weird cases
SELECT '*** date DESC WITH FILL TO 2019-05-01 STEP -2, val DESC WITH FILL FROM 10 TO -5 STEP -3 ***';
SELECT * FROM fill ORDER BY date DESC WITH FILL TO toDate('2019-05-01') STEP -2, val DESC WITH FILL FROM 10 TO -5 STEP -3;
SELECT '*** date WITH FILL TO 2019-06-23 STEP 3, val WITH FILL FROM -10 STEP 2';
SELECT * FROM fill ORDER BY date WITH FILL TO toDate('2019-06-23') STEP 3, val WITH FILL FROM -10 STEP 2;
DROP TABLE fill;
CREATE TABLE fill (a UInt32, b Int32) ENGINE = Memory;
INSERT INTO fill VALUES (1, -2), (1, 3), (3, 2), (5, -1), (6, 5), (8, 0);
SELECT '*** table without fill to compare ***';
SELECT * FROM fill ORDER BY a, b;
SELECT '*** a WITH FILL, b WITH fill ***';
SELECT * FROM fill ORDER BY a WITH FILL, b WITH fill;
SELECT '*** a WITH FILL, b WITH fill TO 6 STEP 2 ***';
SELECT * FROM fill ORDER BY a WITH FILL, b WITH fill TO 6 STEP 2;
SELECT * FROM fill ORDER BY a WITH FILL STEP -1; -- { serverError 475 }
SELECT * FROM fill ORDER BY a WITH FILL FROM 10 TO 1; -- { serverError 475 }
SELECT * FROM fill ORDER BY a DESC WITH FILL FROM 1 TO 10; -- { serverError 475 }
SELECT * FROM fill ORDER BY a WITH FILL FROM -10 to 10; -- { serverError 475 }
DROP TABLE fill;

View File

@ -0,0 +1,52 @@
1
1
*
1
1
2
2
2
2
*
1
1
2
2
2
2
*
1
*
1
2
2
2
2
*
1
1
*
2
2
2
2
*
1
1
2
2
2
2
*
1
1
*
2
2
2
2
*
2
2
2
*

View File

@ -0,0 +1,35 @@
DROP TABLE IF EXISTS ties;
CREATE TABLE ties (a Int) ENGINE = Memory;
-- SET experimental_use_processors=1;
INSERT INTO ties VALUES (1), (1), (2), (2), (2), (2) (3), (3);
SELECT a FROM ties order by a limit 1 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 3 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 5 with ties;
SELECT '*';
SET max_block_size = 2;
SELECT a FROM ties order by a limit 1, 1 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 1, 2 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 2 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 2, 3 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 4 with ties;
SELECT '*';
SET max_block_size = 3;
SELECT a FROM ties order by a limit 1 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 2, 3 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 3, 2 with ties;
SELECT '*';
DROP TABLE ties;

View File

@ -934,7 +934,7 @@ You can select data from a ClickHouse table and save them into some file in the
clickhouse-client --query="SELECT * FROM {some_table} FORMAT Parquet" > {some_file.pq}
```
To exchange data with the Hadoop, you can use `HDFS` table engine.
To exchange data with the Hadoop, you can use [`HDFS` table engine](../../operations/table_engines/hdfs.md).
## Format Schema {#formatschema}

View File

@ -27,7 +27,7 @@ When creating table using `File(Format)` it creates empty subdirectory in that f
You may manually create this subfolder and file in server filesystem and then [ATTACH](../../query_language/misc.md) it to table information with matching name, so you can query data from that file.
!!! warning
Be careful with this funcionality, because ClickHouse does not keep track of external changes to such files. The result of simultaneous writes via ClickHouse and outside of ClickHouse is undefined.
Be careful with this functionality, because ClickHouse does not keep track of external changes to such files. The result of simultaneous writes via ClickHouse and outside of ClickHouse is undefined.
**Example:**
@ -73,9 +73,9 @@ $ echo -e "1,2\n3,4" | clickhouse-local -q "CREATE TABLE table (a Int64, b Int64
- Multiple `SELECT` queries can be performed concurrently, but `INSERT` queries will wait each other.
- Not supported:
- `ALTER`
- `SELECT ... SAMPLE`
- Indices
- Replication
- `ALTER`
- `SELECT ... SAMPLE`
- Indices
- Replication
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/file/) <!--hide-->

View File

@ -0,0 +1,51 @@
# HDFS {#table_engines-hdfs}
This engine provides integration with [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) ecosystem by allowing to manage data on [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.htmll)via ClickHouse. This engine is similar
to the [File](file.md) and [URL](url.md) engines, but provides Hadoop-specific features.
## Usage
```
ENGINE = HDFS(URI, format)
```
The `URI` parameter is the whole file URI in HDFS.
The `format` parameter specifies one of the available file formats. To perform
`SELECT` queries, the format must be supported for input, and to perform
`INSERT` queries -- for output. The available formats are listed in the
[Formats](../../interfaces/formats.md#formats) section.
**Example:**
**1.** Set up the `hdfs_engine_table` table:
``` sql
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')
```
**2.** Fill file:
``` sql
INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3)
```
**3.** Query the data:
``` sql
SELECT * FROM hdfs_engine_table LIMIT 2
```
```
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴───────┘
```
## Implementation Details
- Reads and writes can be parallel
- Not supported:
- `ALTER` and `SELECT...SAMPLE` operations.
- Indexes.
- Replication.
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/hdfs/) <!--hide-->

View File

@ -45,6 +45,7 @@ Engines in the family:
- [MySQL](mysql.md)
- [ODBC](odbc.md)
- [JDBC](jdbc.md)
- [HDFS](hdfs.md)
### Special engines

View File

@ -9,9 +9,9 @@ file(path, format, structure)
**Input parameters**
- `path` — The relative path to the file from [user_files_path](../../operations/server_settings/settings.md#server_settings-user_files_path).
- `path` — The relative path to the file from [user_files_path](../../operations/server_settings/settings.md#server_settings-user_files_path). Path to file support following globs in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, ``'abc', 'def'` — strings.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'colunmn1_name column1_ype, column2_name column2_type, ...'`.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
**Returned value**
@ -51,4 +51,16 @@ LIMIT 2
SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32') LIMIT 10
```
**Globs in path**
- `*` — Matches any number of any characters including none.
- `?` — Matches any single character.
- `{some_string,another_string,yet_another_one}` — Matches any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}` — Matches any number in range from N to M including both borders.
!!! warning
If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
Multiple path components can have globs. For being processed file should exists and matches to the whole path pattern.
[Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/file/) <!--hide-->

View File

@ -0,0 +1,49 @@
# hdfs
Creates a table from a file in HDFS.
```
hdfs(URI, format, structure)
```
**Input parameters**
- `URI` — The relative URI to the file in HDFS. Path to file support following globs in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, ``'abc', 'def'` — strings.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
**Returned value**
A table with the specified structure for reading or writing data in the specified file.
**Example**
Table from `hdfs://hdfs1:9000/test` and selection of the first two rows from it:
```sql
SELECT *
FROM hdfs('hdfs://hdfs1:9000/test', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
LIMIT 2
```
```
┌─column1─┬─column2─┬─column3─┐
│ 1 │ 2 │ 3 │
│ 3 │ 2 │ 1 │
└─────────┴─────────┴─────────┘
```
**Globs in path**
- `*` — Matches any number of any characters including none.
- `?` — Matches any single character.
- `{some_string,another_string,yet_another_one}` — Matches any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}` — Matches any number in range from N to M including both borders.
!!! warning
If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
Multiple path components can have globs. For being processed file should exists and matches to the whole path pattern.
[Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/hdfs/) <!--hide-->

View File

@ -927,7 +927,7 @@ cat {filename} | clickhouse-client --query="INSERT INTO {some_table} FORMAT Parq
clickhouse-client --query="SELECT * FROM {some_table} FORMAT Parquet" > {some_file.pq}
```
Для обмена данными с экосистемой Hadoop можно использовать движки таблиц `HDFS` и `URL`.
Для обмена данными с экосистемой Hadoop можно использовать движки таблиц [`HDFS`](../../operations/table_engines/hdfs.md) и `URL`.
## Схема формата {#formatschema}

View File

@ -0,0 +1,48 @@
# HDFS {#table_engines-hdfs}
Управляет данными в HDFS. Данный движок похож на движок [File](file.md) и на движок [URL](url.md).
## Использование движка
```
ENGINE = HDFS(URI, format)
```
В параметр `URI` нужно передавать полный URI файла в HDFS.
Параметр `format` должен быть таким, который ClickHouse может использовать и в запросах `INSERT`, и в запросах `SELECT`. Полный список поддерживаемых форматов смотрите в разделе [Форматы](../../interfaces/formats.md#formats).
**Пример:**
**1.** Создадим на сервере таблицу `hdfs_engine_table`:
``` sql
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')
```
**2.** Заполним файл:
``` sql
INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3)
```
**3.** Запросим данные:
``` sql
SELECT * FROM hdfs_engine_table LIMIT 2
```
```
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴───────┘
```
## Детали реализации
- Поддерживается многопоточное чтение и запись.
- Не поддерживается:
- использование операций `ALTER` и `SELECT...SAMPLE`;
- индексы;
- репликация.
[Оригинальная статья](https://clickhouse.yandex/docs/ru/operations/table_engines/hdfs/) <!--hide-->

View File

@ -9,7 +9,7 @@ file(path, format, structure)
**Входные параметры**
- `path` — относительный путь до файла от [user_files_path](../../operations/server_settings/settings.md#server_settings-user_files_path).
- `path` — относительный путь до файла от [user_files_path](../../operations/server_settings/settings.md#server_settings-user_files_path). Путь к файлу поддерживает следующие шаблоны в режиме доступа только для чтения `*`, `?`, `{abc,def}` и `{N..M}`, где `N`, `M` — числа, ``'abc', 'def'` — строки.
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат `'colunmn1_name column1_ype, column2_name column2_type, ...'`.
@ -45,4 +45,16 @@ LIMIT 2
└─────────┴─────────┴─────────┘
```
**Шаблоны в пути файла**
- `*` — Матчит любое количество любых символов, включая отсутствие символов.
- `?` — Матчит ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — Матчит любую из строк `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}` — Матчит любое число в интервале от `N` до `M` включительно.
!!! warning
Если ваш список файлов содержит интервал с ведущими нулями, используйте конструкцию с фигурными скобками для каждой цифры по отдельности или используйте `?`.
Шаблоны могут содержаться в разных частях пути. Обрабатываться будут ровно те файлы, которые и удовлетворяют всему шаблону пути, и существуют в файловой системе.
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/table_functions/file/) <!--hide-->

View File

@ -0,0 +1,48 @@
# hdfs
Создаёт таблицу из файла в HDFS.
```
hdfs(URI, format, structure)
```
**Входные параметры**
- `URI` — URI файла в HDFS. Путь к файлу поддерживает следующие шаблоны в режиме доступа только для чтения `*`, `?`, `{abc,def}` и `{N..M}`, где `N`, `M` — числа, ``'abc', 'def'` — строки.
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат `'column1_name column1_type, column2_name column2_type, ...'`.
**Возвращаемое значение**
Таблица с указанной структурой, предназначенная для чтения или записи данных в указанном файле.
**Пример**
Таблица из `hdfs://hdfs1:9000/test` и выборка первых двух строк из неё:
``` sql
SELECT *
FROM hdfs('hdfs://hdfs1:9000/test', 'TSV', 'column1 UInt32, column2 UInt32, column3 UInt32')
LIMIT 2
```
```
┌─column1─┬─column2─┬─column3─┐
│ 1 │ 2 │ 3 │
│ 3 │ 2 │ 1 │
└─────────┴─────────┴─────────┘
```
**Шаблоны в пути файла**
- `*` — Матчит любое количество любых символов, включая отсутствие символов.
- `?` — Матчит ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — Матчит любую из строк `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}` — Матчит любое число в интервале от `N` до `M` включительно.
!!! warning
Если ваш список файлов содержит интервал с ведущими нулями, используйте конструкцию с фигурными скобками для каждой цифры по отдельности или используйте `?`.
Шаблоны могут содержаться в разных частях пути. Обрабатываться будут ровно те файлы, которые и удовлетворяют всему шаблону пути, и существуют в файловой системе.
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/table_functions/hdfs/) <!--hide-->