Merge remote-tracking branch 'upstream/master' into memory-profiler

This commit is contained in:
Ivan Lezhankin 2020-01-30 14:36:59 +03:00
commit d36f082094
58 changed files with 457 additions and 230 deletions

View File

@ -1,5 +1,7 @@
## ClickHouse release v20.1
### ClickHouse release v20.1.2.4, 2020-01-22
### Backward Incompatible Change
* Make the setting `merge_tree_uniform_read_distribution` obsolete. The server still recognizes this setting but it has no effect. [#8308](https://github.com/ClickHouse/ClickHouse/pull/8308) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Changed return type of the function `greatCircleDistance` to `Float32` because now the result of calculation is `Float32`. [#7993](https://github.com/ClickHouse/ClickHouse/pull/7993) ([alexey-milovidov](https://github.com/alexey-milovidov))
@ -7,6 +9,7 @@
* Enable `use_minimalistic_part_header_in_zookeeper` setting for `ReplicatedMergeTree` by default. This will significantly reduce amount of data stored in ZooKeeper. This setting is supported since version 19.1 and we already use it in production in multiple services without any issues for more than half a year. Disable this setting if you have a chance to downgrade to versions older than 19.1. [#6850](https://github.com/ClickHouse/ClickHouse/pull/6850) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Data skipping indices are production ready and enabled by default. The settings `allow_experimental_data_skipping_indices`, `allow_experimental_cross_to_join_conversion` and `allow_experimental_multiple_joins_emulation` are now obsolete and do nothing. [#7974](https://github.com/ClickHouse/ClickHouse/pull/7974) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Add new `ANY JOIN` logic for `StorageJoin` consistent with `JOIN` operation. To upgrade without changes in behaviour you need add `SETTINGS any_join_distinct_right_table_keys = 1` to Engine Join tables metadata or recreate these tables after upgrade. [#8400](https://github.com/ClickHouse/ClickHouse/pull/8400) ([Artem Zuikov](https://github.com/4ertus2))
* Require server to be restarted to apply the changes in logging configuration. This is a temporary workaround to avoid the bug where the server logs to a deleted log file (see [#8696](https://github.com/ClickHouse/ClickHouse/issues/8696)). [#8707](https://github.com/ClickHouse/ClickHouse/pull/8707) ([Alexander Kuzmenkov](https://github.com/akuzm))
### New Feature
* Added information about part paths to `system.merges`. [#8043](https://github.com/ClickHouse/ClickHouse/pull/8043) ([Vladimir Chebotarev](https://github.com/excitoon))

View File

@ -9,7 +9,10 @@ cat "$QUERIES_FILE" | sed "s|{table}|\"${TABLE}\"|g" | while read query; do
echo -n "["
for i in $(seq 1 $TRIES); do
while true; do
RES=$(command time -f %e -o /dev/stdout curl -sS --location-trusted -H "Authorization: OAuth $YT_TOKEN" "$YT_PROXY.yt.yandex.net/query?default_format=Null&database=*$YT_CLIQUE_ID" --data-binary @- <<< "$query" 2>/dev/null) && break;
RES=$(command time -f %e -o /dev/stdout curl -sS -G --data-urlencode "query=$query" --data "default_format=Null&max_memory_usage=100000000000&max_memory_usage_for_all_queries=100000000000&max_concurrent_queries_for_user=100&database=*$YT_CLIQUE_ID" --location-trusted -H "Authorization: OAuth $YT_TOKEN" "$YT_PROXY.yt.yandex.net/query" 2>/dev/null);
if [[ $? == 0 ]]; then
[[ $RES =~ 'fail|Exception' ]] || break;
fi
done
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"

View File

@ -34,10 +34,10 @@ SELECT WatchID, ClientIP, count() AS c, sum(Refresh), avg(ResolutionWidth) FROM
SELECT URL, count() AS c FROM {table} GROUP BY URL ORDER BY c DESC LIMIT 10;
SELECT 1, URL, count() AS c FROM {table} GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
SELECT ClientIP AS x, x - 1, x - 2, x - 3, count() AS c FROM {table} GROUP BY x, x - 1, x - 2, x - 3 ORDER BY c DESC LIMIT 10;
SELECT URL, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-31') AND NOT DontCountHits AND NOT Refresh AND notEmpty(URL) GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-31') AND NOT DontCountHits AND NOT Refresh AND notEmpty(Title) GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-31') AND NOT Refresh AND IsLink AND NOT IsDownload GROUP BY URL ORDER BY PageViews DESC LIMIT 1000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, ((SearchEngineID = 0 AND AdvEngineID = 0) ? Referer : '') AS Src, URL AS Dst, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-31') AND NOT Refresh GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1000;
SELECT URLHash, EventDate, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-31') AND NOT Refresh AND TraficSourceID IN (-1, 6) AND RefererHash = halfMD5('http://example.ru/') GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 100;
SELECT WindowClientWidth, WindowClientHeight, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-31') AND NOT Refresh AND NOT DontCountHits AND URLHash = halfMD5('http://example.ru/') GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10000;
SELECT toStartOfMinute(EventTime) AS Minute, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= toDate('2013-07-01') AND EventDate <= toDate('2013-07-02') AND NOT Refresh AND NOT DontCountHits GROUP BY Minute ORDER BY Minute;
SELECT URL, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND NOT DontCountHits AND NOT Refresh AND notEmpty(URL) GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND NOT DontCountHits AND NOT Refresh AND notEmpty(Title) GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND NOT Refresh AND IsLink AND NOT IsDownload GROUP BY URL ORDER BY PageViews DESC LIMIT 1000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, ((SearchEngineID = 0 AND AdvEngineID = 0) ? Referer : '') AS Src, URL AS Dst, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND NOT Refresh GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1000;
SELECT URLHash, EventDate, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND NOT Refresh AND TraficSourceID IN (-1, 6) AND RefererHash = halfMD5('http://example.ru/') GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 100;
SELECT WindowClientWidth, WindowClientHeight, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND NOT Refresh AND NOT DontCountHits AND URLHash = halfMD5('http://example.ru/') GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10000;
SELECT toStartOfMinute(EventTime) AS Minute, count() AS PageViews FROM {table} WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-02' AND NOT Refresh AND NOT DontCountHits GROUP BY Minute ORDER BY Minute;

View File

@ -65,6 +65,8 @@ void Suggest::loadImpl(Connection & connection, const ConnectionTimeouts & timeo
" UNION ALL "
"SELECT name FROM system.data_type_families"
" UNION ALL "
"SELECT name FROM system.merge_tree_settings"
" UNION ALL "
"SELECT name FROM system.settings"
" UNION ALL "
"SELECT cluster FROM system.clusters"

View File

@ -184,7 +184,7 @@ template <> constexpr bool isDecimalField<DecimalField<Decimal128>>() { return t
class FieldVisitorAccurateEquals : public StaticVisitor<bool>
{
public:
bool operator() (const UInt64 & l, const Null & r) const { return cantCompare(l, r); }
bool operator() (const UInt64 &, const Null &) const { return false; }
bool operator() (const UInt64 & l, const UInt64 & r) const { return l == r; }
bool operator() (const UInt64 & l, const UInt128 & r) const { return cantCompare(l, r); }
bool operator() (const UInt64 & l, const Int64 & r) const { return accurate::equalsOp(l, r); }
@ -194,7 +194,7 @@ public:
bool operator() (const UInt64 & l, const Tuple & r) const { return cantCompare(l, r); }
bool operator() (const UInt64 & l, const AggregateFunctionStateData & r) const { return cantCompare(l, r); }
bool operator() (const Int64 & l, const Null & r) const { return cantCompare(l, r); }
bool operator() (const Int64 &, const Null &) const { return false; }
bool operator() (const Int64 & l, const UInt64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const Int64 & l, const UInt128 & r) const { return cantCompare(l, r); }
bool operator() (const Int64 & l, const Int64 & r) const { return l == r; }
@ -204,7 +204,7 @@ public:
bool operator() (const Int64 & l, const Tuple & r) const { return cantCompare(l, r); }
bool operator() (const Int64 & l, const AggregateFunctionStateData & r) const { return cantCompare(l, r); }
bool operator() (const Float64 & l, const Null & r) const { return cantCompare(l, r); }
bool operator() (const Float64 &, const Null &) const { return false; }
bool operator() (const Float64 & l, const UInt64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const Float64 & l, const UInt128 & r) const { return cantCompare(l, r); }
bool operator() (const Float64 & l, const Int64 & r) const { return accurate::equalsOp(l, r); }
@ -227,6 +227,8 @@ public:
return l == r;
if constexpr (std::is_same_v<T, UInt128>)
return stringToUUID(l) == r;
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -237,6 +239,8 @@ public:
return l == r;
if constexpr (std::is_same_v<T, String>)
return l == stringToUUID(r);
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -245,6 +249,8 @@ public:
{
if constexpr (std::is_same_v<T, Array>)
return l == r;
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -253,6 +259,8 @@ public:
{
if constexpr (std::is_same_v<T, Tuple>)
return l == r;
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -263,6 +271,8 @@ public:
return l == r;
if constexpr (std::is_same_v<U, Int64> || std::is_same_v<U, UInt64>)
return l == DecimalField<Decimal128>(r, 0);
if constexpr (std::is_same_v<U, Null>)
return false;
return cantCompare(l, r);
}
@ -289,11 +299,10 @@ private:
}
};
class FieldVisitorAccurateLess : public StaticVisitor<bool>
{
public:
bool operator() (const UInt64 & l, const Null & r) const { return cantCompare(l, r); }
bool operator() (const UInt64 &, const Null &) const { return false; }
bool operator() (const UInt64 & l, const UInt64 & r) const { return l < r; }
bool operator() (const UInt64 & l, const UInt128 & r) const { return cantCompare(l, r); }
bool operator() (const UInt64 & l, const Int64 & r) const { return accurate::lessOp(l, r); }
@ -303,7 +312,7 @@ public:
bool operator() (const UInt64 & l, const Tuple & r) const { return cantCompare(l, r); }
bool operator() (const UInt64 & l, const AggregateFunctionStateData & r) const { return cantCompare(l, r); }
bool operator() (const Int64 & l, const Null & r) const { return cantCompare(l, r); }
bool operator() (const Int64 &, const Null &) const { return false; }
bool operator() (const Int64 & l, const UInt64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const Int64 & l, const UInt128 & r) const { return cantCompare(l, r); }
bool operator() (const Int64 & l, const Int64 & r) const { return l < r; }
@ -313,7 +322,7 @@ public:
bool operator() (const Int64 & l, const Tuple & r) const { return cantCompare(l, r); }
bool operator() (const Int64 & l, const AggregateFunctionStateData & r) const { return cantCompare(l, r); }
bool operator() (const Float64 & l, const Null & r) const { return cantCompare(l, r); }
bool operator() (const Float64 &, const Null &) const { return false; }
bool operator() (const Float64 & l, const UInt64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const Float64 & l, const UInt128 & r) const { return cantCompare(l, r); }
bool operator() (const Float64 & l, const Int64 & r) const { return accurate::lessOp(l, r); }
@ -336,6 +345,8 @@ public:
return l < r;
if constexpr (std::is_same_v<T, UInt128>)
return stringToUUID(l) < r;
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -346,6 +357,8 @@ public:
return l < r;
if constexpr (std::is_same_v<T, String>)
return l < stringToUUID(r);
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -354,6 +367,8 @@ public:
{
if constexpr (std::is_same_v<T, Array>)
return l < r;
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -362,6 +377,8 @@ public:
{
if constexpr (std::is_same_v<T, Tuple>)
return l < r;
if constexpr (std::is_same_v<T, Null>)
return false;
return cantCompare(l, r);
}
@ -370,8 +387,10 @@ public:
{
if constexpr (isDecimalField<U>())
return l < r;
else if constexpr (std::is_same_v<U, Int64> || std::is_same_v<U, UInt64>)
if constexpr (std::is_same_v<U, Int64> || std::is_same_v<U, UInt64>)
return l < DecimalField<Decimal128>(r, 0);
if constexpr (std::is_same_v<U, Null>)
return false;
return cantCompare(l, r);
}

View File

@ -265,7 +265,19 @@ static void toStringEveryLineImpl(const StackTrace::Frames & frames, size_t offs
uintptr_t virtual_offset = object ? uintptr_t(object->address_begin) : 0;
const void * physical_addr = reinterpret_cast<const void *>(uintptr_t(virtual_addr) - virtual_offset);
out << i << ". " << physical_addr << " ";
out << i << ". ";
if (object)
{
if (std::filesystem::exists(object->name))
{
auto dwarf_it = dwarfs.try_emplace(object->name, *object->elf).first;
DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress(uintptr_t(physical_addr), location, DB::Dwarf::LocationInfoMode::FAST))
out << location.file.toString() << ":" << location.line << ": ";
}
}
auto symbol = symbol_index.findSymbol(virtual_addr);
if (symbol)
@ -276,22 +288,8 @@ static void toStringEveryLineImpl(const StackTrace::Frames & frames, size_t offs
else
out << "?";
out << " ";
if (object)
{
if (std::filesystem::exists(object->name))
{
auto dwarf_it = dwarfs.try_emplace(object->name, *object->elf).first;
DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress(uintptr_t(physical_addr), location, DB::Dwarf::LocationInfoMode::FAST))
out << location.file.toString() << ":" << location.line;
}
out << " in " << object->name;
}
else
out << "?";
out << " @ " << physical_addr;
out << " in " << (object ? object->name : "?");
callback(out.str());
out.str({});

View File

@ -203,6 +203,9 @@ protected:
/// Set to non-nullptr only if we have enough capabilities.
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
private:
void setupState(const ThreadGroupStatusPtr & thread_group_);
};
}

View File

@ -17,6 +17,8 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds)
@ -88,4 +90,29 @@ void ExecutionSpeedLimits::throttle(
}
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
bool ExecutionSpeedLimits::checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode)
{
if (max_execution_time != 0
&& elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(overflow_mode,
"Timeout exceeded: elapsed " + toString(static_cast<double>(elapsed_ns) / 1000000000ULL)
+ " seconds, maximum: " + toString(max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
}

View File

@ -2,6 +2,7 @@
#include <Poco/Timespan.h>
#include <Core/Types.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
@ -23,6 +24,8 @@ public:
/// Pause execution in case if speed limits were exceeded.
void throttle(size_t read_rows, size_t read_bytes, size_t total_rows_to_read, UInt64 total_elapsed_microseconds);
bool checkTimeLimit(UInt64 elapsed_ns, OverflowMode overflow_mode);
};
}

View File

@ -203,30 +203,9 @@ void IBlockInputStream::updateExtremes(Block & block)
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
bool IBlockInputStream::checkTimeLimit()
{
if (limits.speed_limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
return limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode);
}

View File

@ -13,7 +13,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -70,7 +70,7 @@ public:
{
const auto it = value_to_name_map.find(value);
if (it == std::end(value_to_name_map))
throw Exception{"Unexpected value " + toString(value) + " for type " + getName(), ErrorCodes::LOGICAL_ERROR};
throw Exception{"Unexpected value " + toString(value) + " for type " + getName(), ErrorCodes::BAD_ARGUMENTS};
return it->second;
}
@ -79,7 +79,7 @@ public:
{
const auto it = name_to_value_map.find(field_name);
if (!it)
throw Exception{"Unknown element '" + field_name.toString() + "' for type " + getName(), ErrorCodes::LOGICAL_ERROR};
throw Exception{"Unknown element '" + field_name.toString() + "' for type " + getName(), ErrorCodes::BAD_ARGUMENTS};
return it->getMapped();
}

View File

@ -331,7 +331,7 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
/// Let's find the type of the first argument (then getActionsImpl will be called again and will not affect anything).
visit(node.arguments->children.at(0), data);
if ((prepared_set = makeSet(node, data, data.no_subqueries)))
if (!data.no_makeset && (prepared_set = makeSet(node, data, data.no_subqueries)))
{
/// Transform tuple or subquery into a set.
}

View File

@ -72,6 +72,7 @@ public:
PreparedSets & prepared_sets;
SubqueriesForSets & subqueries_for_sets;
bool no_subqueries;
bool no_makeset;
bool only_consts;
bool no_storage_or_local;
size_t visit_depth;
@ -80,7 +81,7 @@ public:
Data(const Context & context_, SizeLimits set_size_limit_, size_t subquery_depth_,
const NamesAndTypesList & source_columns_, const ExpressionActionsPtr & actions,
PreparedSets & prepared_sets_, SubqueriesForSets & subqueries_for_sets_,
bool no_subqueries_, bool only_consts_, bool no_storage_or_local_)
bool no_subqueries_, bool no_makeset_, bool only_consts_, bool no_storage_or_local_)
: context(context_),
set_size_limit(set_size_limit_),
subquery_depth(subquery_depth_),
@ -88,6 +89,7 @@ public:
prepared_sets(prepared_sets_),
subqueries_for_sets(subqueries_for_sets_),
no_subqueries(no_subqueries_),
no_makeset(no_makeset_),
only_consts(only_consts_),
no_storage_or_local(no_storage_or_local_),
visit_depth(0),

View File

@ -122,7 +122,7 @@ void ExpressionAnalyzer::analyzeAggregation()
ASTPtr array_join_expression_list = select_query->array_join_expression_list(is_array_join_left);
if (array_join_expression_list)
{
getRootActions(array_join_expression_list, true, temp_actions);
getRootActionsNoMakeSet(array_join_expression_list, true, temp_actions, false);
addMultipleArrayJoinAction(temp_actions, is_array_join_left);
array_join_columns.clear();
@ -134,7 +134,7 @@ void ExpressionAnalyzer::analyzeAggregation()
const ASTTablesInSelectQueryElement * join = select_query->join();
if (join)
{
getRootActions(analyzedJoin().leftKeysList(), true, temp_actions);
getRootActionsNoMakeSet(analyzedJoin().leftKeysList(), true, temp_actions, false);
addJoinAction(temp_actions);
}
}
@ -155,7 +155,7 @@ void ExpressionAnalyzer::analyzeAggregation()
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
{
ssize_t size = group_asts.size();
getRootActions(group_asts[i], true, temp_actions);
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto & block = temp_actions->getSampleBlock();
@ -340,7 +340,18 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries,
LogAST log;
ActionsVisitor::Data visitor_data(context, settings.size_limits_for_set, subquery_depth,
sourceColumns(), actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage());
no_subqueries, false, only_consts, !isRemoteStorage());
ActionsVisitor(visitor_data, log.stream()).visit(ast);
visitor_data.updateActions(actions);
}
void ExpressionAnalyzer::getRootActionsNoMakeSet(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts)
{
LogAST log;
ActionsVisitor::Data visitor_data(context, settings.size_limits_for_set, subquery_depth,
sourceColumns(), actions, prepared_sets, subqueries_for_sets,
no_subqueries, true, only_consts, !isRemoteStorage());
ActionsVisitor(visitor_data, log.stream()).visit(ast);
visitor_data.updateActions(actions);
}

View File

@ -134,6 +134,12 @@ protected:
void getRootActions(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts = false);
/** Similar to getRootActions but do not make sets when analyzing IN functions. It's used in
* analyzeAggregation which happens earlier than analyzing PREWHERE and WHERE. If we did, the
* prepared sets would not be applicable for MergeTree index optimization.
*/
void getRootActionsNoMakeSet(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts = false);
/** Add aggregation keys to aggregation_keys, aggregate functions to aggregate_descriptions,
* Create a set of columns aggregated_columns resulting after the aggregation, if any,
* or after all the actions that are normally performed before aggregation.

View File

@ -1,4 +1,5 @@
#include <Common/ThreadStatus.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/Exception.h>
@ -7,11 +8,11 @@
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/ProcessList.h>
#if defined(__linux__)
#include <sys/time.h>
#include <sys/resource.h>
#if defined(OS_LINUX)
# include <Common/hasLinuxCapability.h>
#include <Common/hasLinuxCapability.h>
# include <sys/time.h>
# include <sys/resource.h>
#endif
@ -37,10 +38,13 @@ void ThreadStatus::attachQueryContext(Context & query_context_)
if (thread_group)
{
std::lock_guard lock(thread_group->mutex);
thread_group->query_context = query_context;
if (!thread_group->global_context)
thread_group->global_context = global_context;
}
initQueryProfiler();
}
void CurrentThread::defaultThreadDeleter()
@ -50,40 +54,11 @@ void CurrentThread::defaultThreadDeleter()
current_thread->detachQuery(true, true);
}
void ThreadStatus::initializeQuery()
void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
{
assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
thread_group = std::make_shared<ThreadGroupStatus>();
performance_counters.setParent(&thread_group->performance_counters);
memory_tracker.setParent(&thread_group->memory_tracker);
thread_group->memory_tracker.setDescription("(for query)");
thread_group->thread_numbers.emplace_back(thread_number);
thread_group->os_thread_ids.emplace_back(os_thread_id);
thread_group->master_thread_number = thread_number;
thread_group->master_thread_os_id = os_thread_id;
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
}
void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
if (thread_state == ThreadState::AttachedToQuery)
{
if (check_detached)
throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
return;
}
assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
if (!thread_group_)
throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);
/// Attach current thread to thread group and copy useful information from it
/// Attach or init current thread to thread group and copy useful information from it
thread_group = thread_group_;
performance_counters.setParent(&thread_group->performance_counters);
@ -92,22 +67,23 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
{
std::lock_guard lock(thread_group->mutex);
/// NOTE: thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_numbers.emplace_back(thread_number);
thread_group->os_thread_ids.emplace_back(os_thread_id);
logs_queue_ptr = thread_group->logs_queue_ptr;
query_context = thread_group->query_context;
if (!global_context)
global_context = thread_group->global_context;
/// NOTE: A thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_numbers.emplace_back(thread_number);
thread_group->os_thread_ids.emplace_back(os_thread_id);
}
if (query_context)
{
query_id = query_context->getCurrentQueryId();
initQueryProfiler();
#if defined(__linux__)
#if defined(OS_LINUX)
/// Set "nice" value if required.
Int32 new_os_thread_priority = query_context->getSettingsRef().os_thread_priority;
if (new_os_thread_priority && hasLinuxCapability(CAP_SYS_NICE))
@ -123,11 +99,35 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
}
initPerformanceCounters();
initQueryProfiler();
thread_state = ThreadState::AttachedToQuery;
}
void ThreadStatus::initializeQuery()
{
setupState(std::make_shared<ThreadGroupStatus>());
/// No need to lock on mutex here
thread_group->memory_tracker.setDescription("(for query)");
thread_group->master_thread_number = thread_number;
thread_group->master_thread_os_id = os_thread_id;
}
void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
if (thread_state == ThreadState::AttachedToQuery)
{
if (check_detached)
throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
return;
}
if (!thread_group_)
throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);
setupState(thread_group_);
}
void ThreadStatus::finalizePerformanceCounters()
{
if (performance_counters_finalized)

View File

@ -10,6 +10,7 @@
#include <Common/Stopwatch.h>
#include <Processors/ISource.h>
#include <Common/setThreadName.h>
#include <Interpreters/ProcessList.h>
namespace DB
@ -30,12 +31,13 @@ static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
PipelineExecutor::PipelineExecutor(Processors & processors_)
PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
: processors(processors_)
, cancelled(false)
, finished(false)
, num_processing_executors(0)
, expand_pipeline_task(nullptr)
, process_list_element(elem)
{
buildGraph();
}
@ -472,6 +474,9 @@ void PipelineExecutor::execute(size_t num_threads)
throw;
}
if (process_list_element && process_list_element->isKilled())
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
if (cancelled)
return;

View File

@ -13,6 +13,7 @@
namespace DB
{
class QueryStatus;
/// Executes query pipeline.
class PipelineExecutor
@ -24,7 +25,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set.
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors_);
explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr);
/// Execute pipeline in multiple threads. Must be called once.
/// In case of exception during execution throws any occurred.
@ -242,6 +243,9 @@ private:
using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>;
ProcessorsMap processors_map;
/// Now it's used to check if query was killed.
QueryStatus * process_list_element = nullptr;
/// Graph related methods.
bool addEdges(UInt64 node);
void buildGraph();

View File

@ -1,5 +1,6 @@
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Interpreters/ProcessList.h>
#include <stack>
namespace DB
@ -152,6 +153,12 @@ void TreeExecutorBlockInputStream::execute()
case IProcessor::Status::Ready:
{
node->work();
/// This is handled inside PipelineExecutor now,
/// and doesn't checked by processors as in IInputStream before.
if (process_list_element && process_list_element->isKilled())
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
break;
}
case IProcessor::Status::Async:
@ -188,6 +195,8 @@ void TreeExecutorBlockInputStream::setProgressCallback(const ProgressCallback &
void TreeExecutorBlockInputStream::setProcessListElement(QueryStatus * elem)
{
process_list_element = elem;
for (auto & source : sources_with_progress)
source->setProcessListElement(elem);
}

View File

@ -46,6 +46,8 @@ private:
/// Remember sources that support progress.
std::vector<ISourceWithProgress *> sources_with_progress;
QueryStatus * process_list_element = nullptr;
void init();
/// Execute tree step-by-step until root returns next chunk or execution is finished.
void execute();

View File

@ -222,7 +222,11 @@ public:
/// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
bool isCancelled() const { return is_cancelled; }
void cancel() { is_cancelled = true; }
void cancel()
{
is_cancelled = true;
onCancel();
}
virtual ~IProcessor() = default;
@ -275,6 +279,9 @@ public:
void enableQuota() { has_quota = true; }
bool hasQuota() const { return has_quota; }
protected:
virtual void onCancel() {}
private:
std::atomic<bool> is_cancelled{false};

View File

@ -11,7 +11,7 @@ ISource::ISource(Block header)
ISource::Status ISource::prepare()
{
if (finished)
if (finished || isCancelled())
{
output.finish();
return Status::Finished;
@ -46,7 +46,7 @@ void ISource::work()
try
{
current_chunk.chunk = generate();
if (!current_chunk.chunk)
if (!current_chunk.chunk || isCancelled())
finished = true;
else
has_input = true;

View File

@ -76,7 +76,7 @@ LimitTransform::Status LimitTransform::prepare()
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull();
current_chunk = input.pull(true);
has_block = true;
auto rows = current_chunk.getNumRows();
@ -95,6 +95,7 @@ LimitTransform::Status LimitTransform::prepare()
}
/// Now, we pulled from input, and it must be empty.
input.setNeeded();
return Status::NeedData;
}
@ -114,6 +115,7 @@ LimitTransform::Status LimitTransform::prepare()
}
/// Now, we pulled from input, and it must be empty.
input.setNeeded();
return Status::NeedData;
}

View File

@ -523,6 +523,8 @@ void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
void QueryPipeline::setProcessListElement(QueryStatus * elem)
{
process_list_element = elem;
for (auto & processor : processors)
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
@ -630,7 +632,7 @@ PipelineExecutorPtr QueryPipeline::execute()
if (!output_format)
throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<PipelineExecutor>(processors);
return std::make_shared<PipelineExecutor>(processors, process_list_element);
}
}

View File

@ -123,6 +123,8 @@ private:
size_t max_threads = 0;
QueryStatus * process_list_element = nullptr;
void checkInitialized();
void checkSource(const ProcessorPtr & source, bool can_have_totals);

View File

@ -35,7 +35,7 @@ IProcessor::Status SourceFromInputStream::prepare()
is_generating_finished = true;
/// Read postfix and get totals if needed.
if (!is_stream_finished)
if (!is_stream_finished && !isCancelled())
return Status::Ready;
if (has_totals_port)
@ -109,7 +109,7 @@ Chunk SourceFromInputStream::generate()
}
auto block = stream->read();
if (!block)
if (!block && !isCancelled())
{
stream->readSuffix();

View File

@ -30,6 +30,9 @@ public:
void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }
void addTotalRowsApprox(size_t value) final { stream->addTotalRowsApprox(value); }
protected:
void onCancel() override { stream->cancel(false); }
private:
bool has_aggregate_functions = false;
bool force_add_aggregating_info;

View File

@ -10,6 +10,15 @@ namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TIMEOUT_EXCEEDED;
}
void SourceWithProgress::work()
{
if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode))
cancel();
else
ISourceWithProgress::work();
}
/// Aggregated copy-paste from IBlockInputStream::progressImpl.

View File

@ -58,6 +58,8 @@ protected:
/// Call this method to provide information about progress.
void progress(const Progress & value);
void work() override;
private:
LocalLimits limits;
std::shared_ptr<QuotaContext> quota;

View File

@ -17,20 +17,6 @@ namespace ErrorCodes
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
void ProcessorProfileInfo::update(const Chunk & block)
{
++blocks;
@ -44,13 +30,6 @@ LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, LocalLim
{
}
//LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem)
// : ISimpleTransform(header, header, false)
// , limits(std::move(limits))
// , process_list_elem(process_list_elem)
//{
//}
void LimitsCheckingTransform::transform(Chunk & chunk)
{
if (!info.started)
@ -59,7 +38,7 @@ void LimitsCheckingTransform::transform(Chunk & chunk)
info.started = true;
}
if (!checkTimeLimit())
if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch.elapsed(), limits.timeout_overflow_mode))
{
stopReading();
return;
@ -78,18 +57,6 @@ void LimitsCheckingTransform::transform(Chunk & chunk)
}
}
bool LimitsCheckingTransform::checkTimeLimit()
{
if (limits.speed_limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.speed_limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.speed_limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
void LimitsCheckingTransform::checkQuota(Chunk & chunk)
{
switch (limits.mode)

View File

@ -29,10 +29,7 @@ public:
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
/// LIMITS_CURRENT
LimitsCheckingTransform(const Block & header_, LocalLimits limits_);
/// LIMITS_TOTAL
/// LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem);
String getName() const override { return "LimitsCheckingTransform"; }

View File

@ -117,6 +117,9 @@ public:
/// Returns true if the blocks shouldn't be pushed to associated views on insert.
virtual bool noPushingToViews() const { return false; }
/// Read query returns streams which automatically distribute data between themselves.
/// So, it's impossible for one stream run out of data when there is data in other streams.
/// Example is StorageSystemNumbers.
virtual bool hasEvenlyDistributedRead() const { return false; }
/// Optional size information of each physical column.

View File

@ -5,6 +5,9 @@
#include <DataStreams/LimitBlockInputStream.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <Processors/LimitTransform.h>
namespace DB
{
@ -12,21 +15,16 @@ namespace DB
namespace
{
class NumbersBlockInputStream : public IBlockInputStream
class NumbersSource : public SourceWithProgress
{
public:
NumbersBlockInputStream(UInt64 block_size_, UInt64 offset_, UInt64 step_)
: block_size(block_size_), next(offset_), step(step_) {}
NumbersSource(UInt64 block_size_, UInt64 offset_, UInt64 step_)
: SourceWithProgress(createHeader()), block_size(block_size_), next(offset_), step(step_) {}
String getName() const override { return "Numbers"; }
Block getHeader() const override
{
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
}
protected:
Block readImpl() override
Chunk generate() override
{
auto column = ColumnUInt64::create(block_size);
ColumnUInt64::Container & vec = column->getData();
@ -38,12 +36,21 @@ protected:
*pos++ = curr++;
next += step;
return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
progress({column->size(), column->byteSize()});
return { Columns {std::move(column)}, block_size };
}
private:
UInt64 block_size;
UInt64 next;
UInt64 step;
static Block createHeader()
{
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
}
};
@ -55,21 +62,19 @@ struct NumbersMultiThreadedState
using NumbersMultiThreadedStatePtr = std::shared_ptr<NumbersMultiThreadedState>;
class NumbersMultiThreadedBlockInputStream : public IBlockInputStream
class NumbersMultiThreadedSource : public SourceWithProgress
{
public:
NumbersMultiThreadedBlockInputStream(NumbersMultiThreadedStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
: state(std::move(state_)), block_size(block_size_), max_counter(max_counter_) {}
NumbersMultiThreadedSource(NumbersMultiThreadedStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
: SourceWithProgress(createHeader())
, state(std::move(state_))
, block_size(block_size_)
, max_counter(max_counter_) {}
String getName() const override { return "NumbersMt"; }
Block getHeader() const override
{
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
}
protected:
Block readImpl() override
Chunk generate() override
{
if (block_size == 0)
return {};
@ -90,7 +95,9 @@ protected:
while (pos < end)
*pos++ = curr++;
return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
progress({column->size(), column->byteSize()});
return { Columns {std::move(column)}, block_size };
}
private:
@ -98,6 +105,11 @@ private:
UInt64 block_size;
UInt64 max_counter;
Block createHeader() const
{
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
}
};
}
@ -109,7 +121,7 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multi
setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
}
BlockInputStreams StorageSystemNumbers::read(
Pipes StorageSystemNumbers::readWithProcessors(
const Names & column_names,
const SelectQueryInfo &,
const Context & /*context*/,
@ -128,7 +140,8 @@ BlockInputStreams StorageSystemNumbers::read(
if (!multithreaded)
num_streams = 1;
BlockInputStreams res(num_streams);
Pipes res;
res.reserve(num_streams);
if (num_streams > 1 && !even_distribution && *limit)
{
@ -136,17 +149,26 @@ BlockInputStreams StorageSystemNumbers::read(
UInt64 max_counter = offset + *limit;
for (size_t i = 0; i < num_streams; ++i)
res[i] = std::make_shared<NumbersMultiThreadedBlockInputStream>(state, max_block_size, max_counter);
res.emplace_back(std::make_shared<NumbersMultiThreadedSource>(state, max_block_size, max_counter));
return res;
}
for (size_t i = 0; i < num_streams; ++i)
{
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
auto source = std::make_shared<NumbersSource>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
res[i] = std::make_shared<LimitBlockInputStream>(res[i], *limit * (i + 1) / num_streams - *limit * i / num_streams, 0, false, true);
if (limit && i == 0)
source->addTotalRowsApprox(*limit);
res.emplace_back(std::move(source));
if (limit)
{
/// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
res.back().addSimpleTransform(std::make_shared<LimitTransform>(
res.back().getHeader(), *limit * (i + 1) / num_streams - *limit * i / num_streams, 0, false));
}
}
return res;

View File

@ -29,7 +29,7 @@ class StorageSystemNumbers : public ext::shared_ptr_helper<StorageSystemNumbers>
public:
std::string getName() const override { return "SystemNumbers"; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -37,6 +37,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
bool hasEvenlyDistributedRead() const override { return true; }
private:

View File

@ -107,7 +107,7 @@ def get_stacktraces(server_pid):
def get_server_pid(server_tcp_port):
cmd = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | gawk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port)
cmd = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port)
try:
output = subprocess.check_output(cmd, shell=True)
if output:

View File

@ -162,35 +162,48 @@ def test_inserts_local(started_cluster):
assert instance.query("SELECT count(*) FROM local").strip() == '1'
def test_prefer_localhost_replica(started_cluster):
test_query = "SELECT * FROM distributed ORDER BY id;"
test_query = "SELECT * FROM distributed ORDER BY id"
node1.query("INSERT INTO distributed VALUES (toDate('2017-06-17'), 11)")
node2.query("INSERT INTO distributed VALUES (toDate('2017-06-17'), 22)")
time.sleep(1.0)
expected_distributed = '''\
2017-06-17\t11
2017-06-17\t22
'''
assert TSV(node1.query(test_query)) == TSV(expected_distributed)
assert TSV(node2.query(test_query)) == TSV(expected_distributed)
with PartitionManager() as pm:
pm.partition_instances(node1, node2, action='REJECT --reject-with tcp-reset')
node1.query("INSERT INTO replicated VALUES (toDate('2017-06-17'), 33)")
node2.query("INSERT INTO replicated VALUES (toDate('2017-06-17'), 44)")
time.sleep(1.0)
expected_from_node2 = '''\
2017-06-17\t11
2017-06-17\t22
2017-06-17\t44
'''
# Query is sent to node2, as it local and prefer_localhost_replica=1
assert TSV(node2.query(test_query)) == TSV(expected_from_node2)
expected_from_node1 = '''\
2017-06-17\t11
2017-06-17\t22
2017-06-17\t33
'''
assert TSV(node1.query(test_query)) == TSV(expected_distributed)
assert TSV(node2.query(test_query)) == TSV(expected_distributed)
# Make replicas inconsistent by disabling merges and fetches
# for possibility of determining to which replica the query was send
node1.query("SYSTEM STOP MERGES")
node1.query("SYSTEM STOP FETCHES")
node2.query("SYSTEM STOP MERGES")
node2.query("SYSTEM STOP FETCHES")
node1.query("INSERT INTO replicated VALUES (toDate('2017-06-17'), 33)")
node2.query("INSERT INTO replicated VALUES (toDate('2017-06-17'), 44)")
time.sleep(1.0)
# Query is sent to node2, as it local and prefer_localhost_replica=1
assert TSV(node2.query(test_query)) == TSV(expected_from_node2)
# Now query is sent to node1, as it higher in order
assert TSV(node2.query("SET load_balancing='in_order'; SET prefer_localhost_replica=0;" + test_query)) == TSV(expected_from_node1)
assert TSV(node2.query(test_query + " SETTINGS load_balancing='in_order', prefer_localhost_replica=0")) == TSV(expected_from_node1)
def test_inserts_low_cardinality(started_cluster):
instance = shard1

View File

@ -11,4 +11,6 @@ INSERT INTO cast_enums SELECT 2 AS type, toDate('2017-01-01') AS date, number AS
SELECT type, date, id FROM cast_enums ORDER BY type, id;
INSERT INTO cast_enums VALUES ('wrong_value', '2017-01-02', 7); -- { clientError 36 }
DROP TABLE IF EXISTS cast_enums;

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS testmt;
CREATE TABLE testmt (`CounterID` UInt64, `value` String) ENGINE = MergeTree() ORDER BY CounterID;
INSERT INTO testmt VALUES (1, '1'), (2, '2');
SELECT arrayJoin([CounterID NOT IN (2)]) AS counter FROM testmt WHERE CounterID IN (2) GROUP BY counter;
DROP TABLE testmt;

View File

@ -33,6 +33,7 @@ RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
ENV TZ UTC
RUN mkdir /docker-entrypoint-initdb.d

View File

@ -87,15 +87,15 @@ params['header'] = "ClickHouse Performance Comparison"
params['test_part'] = (table_template.format_map(
collections.defaultdict(str,
caption = 'Changes in performance',
header = table_header(['Left', 'Right', 'Diff', 'RD', 'Query']),
header = table_header(['Old, s', 'New, s', 'Relative difference (new&nbsp;-&nbsp;old)/old', 'Randomization distribution quantiles [5%,&nbsp;50%,&nbsp;95%]', 'Query']),
rows = tsv_rows('changed-perf.tsv'))) +
table_template.format(
caption = 'Slow on client',
header = table_header(['Client', 'Server', 'Ratio', 'Query']),
header = table_header(['Client time, s', 'Server time, s', 'Ratio', 'Query']),
rows = tsv_rows('slow-on-client.tsv')) +
table_template.format(
caption = 'Unstable',
header = table_header(['Left', 'Right', 'Diff', 'RD', 'Query']),
header = table_header(['Old, s', 'New, s', 'Relative difference (new&nbsp;-&nbsp;old)/old', 'Randomization distribution quantiles [5%,&nbsp;50%,&nbsp;95%]', 'Query']),
rows = tsv_rows('unstable.tsv')) +
table_template.format(
caption = 'Run errors',

View File

@ -368,7 +368,7 @@ For more information, see the section "[Creating replicated tables](../../operat
## mark_cache_size {#server-mark-cache-size}
Approximate size (in bytes) of the cache of marks used by table engines of the [MergeTree](../../operations/table_engines/mergetree.md) family.
Approximate size (in bytes) of the cache of marks used by table engines of the [MergeTree](../table_engines/mergetree.md) family.
The cache is shared for the server and memory is allocated as needed. The cache size must be at least 5368709120.
@ -420,7 +420,7 @@ We recommend using this option in Mac OS X, since the `getrlimit()` function ret
Restriction on deleting tables.
If the size of a [MergeTree](../../operations/table_engines/mergetree.md) table exceeds `max_table_size_to_drop` (in bytes), you can't delete it using a DROP query.
If the size of a [MergeTree](../table_engines/mergetree.md) table exceeds `max_table_size_to_drop` (in bytes), you can't delete it using a DROP query.
If you still need to delete the table without restarting the ClickHouse server, create the `<clickhouse-path>/flags/force_drop_table` file and run the DROP query.
@ -437,7 +437,7 @@ The value 0 means that you can delete all tables without any restrictions.
## merge_tree {#server_settings-merge_tree}
Fine tuning for tables in the [MergeTree](../../operations/table_engines/mergetree.md).
Fine tuning for tables in the [MergeTree](../table_engines/mergetree.md).
For more information, see the MergeTreeSettings.h header file.
@ -512,7 +512,7 @@ Keys for server/client settings:
## part_log {#server_settings-part-log}
Logging events that are associated with [MergeTree](../../operations/table_engines/mergetree.md). For instance, adding or merging data. You can use the log to simulate merge algorithms and compare their characteristics. You can visualize the merge process.
Logging events that are associated with [MergeTree](../table_engines/mergetree.md). For instance, adding or merging data. You can use the log to simulate merge algorithms and compare their characteristics. You can visualize the merge process.
Queries are logged in the [system.part_log](../system_tables.md#system_tables-part-log) table, not in a separate file. You can configure the name of this table in the `table` parameter (see below).
@ -739,7 +739,7 @@ Path to temporary data for processing large queries.
## tmp_policy {#server-settings-tmp_policy}
Policy from [`storage_configuration`](mergetree.md#table_engine-mergetree-multiple-volumes) to store temporary files.
Policy from [`storage_configuration`](../table_engines/mergetree.md#table_engine-mergetree-multiple-volumes) to store temporary files.
If not set [`tmp_path`](#server-settings-tmp_path) is used, otherwise it is ignored.
!!! note
@ -750,7 +750,7 @@ If not set [`tmp_path`](#server-settings-tmp_path) is used, otherwise it is igno
## uncompressed_cache_size {#server-settings-uncompressed_cache_size}
Cache size (in bytes) for uncompressed data used by table engines from the [MergeTree](../../operations/table_engines/mergetree.md).
Cache size (in bytes) for uncompressed data used by table engines from the [MergeTree](../table_engines/mergetree.md).
There is one shared cache for the server. Memory is allocated on demand. The cache is used if the option [use_uncompressed_cache](../settings/settings.md#setting-use_uncompressed_cache) is enabled.

View File

@ -805,9 +805,9 @@ WHERE name in ('Kafka', 'MergeTree', 'ReplicatedCollapsingMergeTree')
**See also**
- MergeTree family [query clauses](table_engines/mergetree/#sektsii-zaprosa)
- MergeTree family [query clauses](table_engines/mergetree.md#mergetree-query-clauses)
- Kafka [settings](table_engines/kafka.md#table_engine-kafka-creating-a-table)
- Join [settings](table_engines/join/#limitations-and-settings)
- Join [settings](table_engines/join.md#join-limitations-and-settings)
## system.tables

View File

@ -14,7 +14,7 @@ The Distributed engine accepts parameters:
See also:
- `insert_distributed_sync` setting
- [MergeTree](../mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
- [MergeTree](mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
Example:

View File

@ -77,7 +77,7 @@ You cannot perform a `SELECT` query directly from the table. Instead, use one of
- Place the table to the right side in a `JOIN` clause.
- Call the [joinGet](../../query_language/functions/other_functions.md#other_functions-joinget) function, which lets you extract data from the table the same way as from a dictionary.
### Limitations and Settings
### Limitations and Settings {#join-limitations-and-settings}
When creating a table, the following settings are applied:

View File

@ -50,7 +50,7 @@ For a description of parameters, see the [CREATE query description](../../query_
!!!note "Note"
`INDEX` is an experimental feature, see [Data Skipping Indexes](#table_engine-mergetree-data_skipping-indexes).
### Query Clauses
### Query Clauses {#mergetree-query-clauses}
- `ENGINE` — Name and parameters of the engine. `ENGINE = MergeTree()`. The `MergeTree` engine does not have parameters.

View File

@ -2,8 +2,8 @@
## Q1 2020
- Resource pools for more precise distribution of cluster capacity between users
- Fine-grained authorization
- Role-based access control
- Integration with external authentication services
- Resource pools for more precise distribution of cluster capacity between users
[Original article](https://clickhouse.yandex/docs/en/roadmap/) <!--hide-->

View File

@ -727,9 +727,9 @@ WHERE name in ('Kafka', 'MergeTree', 'ReplicatedCollapsingMergeTree')
**Смотрите также**
- [Секции движка](table_engines/mergetree/#sektsii-zaprosa) семейства MergeTree
- [Секции движка](table_engines/mergetree/#mergetree-query-clauses) семейства MergeTree
- [Настройки](table_engines/kafka.md#table_engine-kafka-creating-a-table) Kafka
- [Настройки](table_engines/join/#limitations-and-settings) Join
- [Настройки](table_engines/join/#join-limitations-and-settings) Join
## system.tables

View File

@ -79,7 +79,7 @@ SELECT joinGet('id_val_join', 'val', toUInt32(1))
- Используйте таблицу как правую в секции `JOIN`.
- Используйте функцию [joinGet](../../query_language/functions/other_functions.md#other_functions-joinget), которая позволяет извлекать данные из таблицы таким же образом как из словаря.
### Ограничения и настройки
### Ограничения и настройки {#join-limitations-and-settings}
При создании таблицы, применяются следующие параметры :

View File

@ -49,7 +49,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
!!!note "Note"
`INDEX` — экспериментальная возможность, смотрите [Индексы пропуска данных](#table_engine-mergetree-data_skipping-indexes).
### Секции запроса
### Секции запроса {#mergetree-query-clauses}
- `ENGINE` — имя и параметры движка. `ENGINE = MergeTree()`. `MergeTree` не имеет параметров.

View File

@ -11,7 +11,6 @@ import subprocess
import sys
import time
import markdown.extensions
import markdown.util
from mkdocs import config
@ -197,12 +196,46 @@ def build_single_page_version(lang, args, cfg):
shutil.copytree(test_dir, args.save_raw_single_page)
def write_redirect_html(out_path, to_url):
with open(out_path, 'w') as f:
f.write('''<!DOCTYPE HTML>
<html lang="en-US">
<head>
<meta charset="UTF-8">
<meta http-equiv="refresh" content="0; url=%s">
<script type="text/javascript">
window.location.href = "%s"
</script>
<title>Page Redirection</title>
</head>
<body>
If you are not redirected automatically, follow this <a href="%s">link</a>.
</body>
</html>''' % (to_url, to_url, to_url))
def build_redirect_html(args, from_path, to_path):
for lang in args.lang.split(','):
out_path = os.path.join(args.docs_output_dir, lang, from_path.replace('.md', '/index.html'))
out_dir = os.path.dirname(out_path)
try:
os.makedirs(out_dir)
except OSError:
pass
version_prefix = args.version_prefix + '/' if args.version_prefix else '/'
to_url = '/docs%s%s/%s' % (version_prefix, lang, to_path.replace('.md', '/'))
to_url = to_url.strip()
write_redirect_html(out_path, to_url)
def build_redirects(args):
lang_re_fragment = args.lang.replace(',', '|')
rewrites = []
with open(os.path.join(args.docs_dir, 'redirects.txt'), 'r') as f:
for line in f:
from_path, to_path = line.split(' ', 1)
build_redirect_html(args, from_path, to_path)
from_path = '^/docs/(' + lang_re_fragment + ')/' + from_path.replace('.md', '/?') + '$'
to_path = '/docs/$1/' + to_path.replace('.md', '/')
rewrites.append(' '.join(['rewrite', from_path, to_path, 'permanent;']))
@ -212,8 +245,11 @@ def build_redirects(args):
def build_docs(args):
tasks = []
for lang in args.lang.split(','):
build_for_lang(lang, args)
tasks.append((lang, args,))
util.run_function_in_parallel(build_for_lang, tasks, threads=True)
build_redirects(args)
def build(args):
@ -228,11 +264,14 @@ def build(args):
from github import build_releases
build_releases(args, build_docs)
build_redirects(args)
if not args.skip_website:
minify_website(args)
write_redirect_html(
os.path.join(args.output_dir, 'tutorial.html'),
'/docs/en/getting_started/tutorial/'
)
if __name__ == '__main__':
os.chdir(os.path.join(os.path.dirname(__file__), '..'))

View File

@ -3,6 +3,7 @@ import copy
import io
import logging
import os
import sys
import tarfile
import requests
@ -18,17 +19,22 @@ def choose_latest_releases():
candidates += requests.get(url).json()
for tag in candidates:
name = tag.get('name', '')
is_unstable = ('stable' not in name) and ('lts' not in name)
is_in_blacklist = ('v18' in name) or ('prestable' in name) or ('v1.1' in name)
if is_unstable or is_in_blacklist:
continue
major_version = '.'.join((name.split('.', 2))[:2])
if major_version not in seen:
seen[major_version] = (name, tag.get('tarball_url'),)
if len(seen) > 10:
break
if isinstance(tag, dict):
name = tag.get('name', '')
is_unstable = ('stable' not in name) and ('lts' not in name)
is_in_blacklist = ('v18' in name) or ('prestable' in name) or ('v1.1' in name)
if is_unstable or is_in_blacklist:
continue
major_version = '.'.join((name.split('.', 2))[:2])
if major_version not in seen:
seen[major_version] = (name, tag.get('tarball_url'),)
if len(seen) > 10:
break
else:
logging.fatal('Unexpected GitHub response: %s', str(candidates))
sys.exit(1)
logging.info('Found stable releases: %s', str(seen.keys()))
return seen.items()
@ -47,6 +53,7 @@ def process_release(args, callback, release):
def build_releases(args, callback):
tasks = []
for release in args.stable_releases:
process_release(args, callback, release)

View File

@ -3,7 +3,11 @@ set -ex
BASE_DIR=$(dirname $(readlink -f $0))
BUILD_DIR="${BASE_DIR}/../build"
PUBLISH_DIR="${BASE_DIR}/../publish"
IMAGE="clickhouse/website"
GIT_TEST_URI="git@github.com:ClickHouse/clickhouse-test.github.io.git"
GIT_PROD_URI="git@github.com:ClickHouse/clickhouse.github.io.git"
if [[ -z "$1" ]]
then
TAG=$(head -c 8 /dev/urandom | xxd -p)
@ -17,11 +21,41 @@ if [[ -z "$1" ]]
then
source "${BASE_DIR}/venv/bin/activate"
python "${BASE_DIR}/build.py" "--enable-stable-releases"
rm -rf "${PUBLISH_DIR}" || true
git clone "${GIT_TEST_URI}" "${PUBLISH_DIR}"
cd "${PUBLISH_DIR}"
git config user.email "robot-clickhouse@yandex-team.ru"
git config user.name "robot-clickhouse"
git rm -rf *
git commit -a -m "wipe old release"
cp -R "${BUILD_DIR}"/* .
echo -n "test.clickhouse.tech" > CNAME
echo -n "" > README.md
cp "${BASE_DIR}/../../LICENSE" .
git add *
git commit -a -m "add new release at $(date)"
git push origin master
cd "${BUILD_DIR}"
docker build -t "${FULL_NAME}" "${BUILD_DIR}"
docker tag "${FULL_NAME}" "${REMOTE_NAME}"
DOCKER_HASH=$(docker push "${REMOTE_NAME}" | tail -1 | awk '{print $3;}')
docker rmi "${FULL_NAME}"
else
rm -rf "${BUILD_DIR}" || true
rm -rf "${PUBLISH_DIR}" || true
git clone "${GIT_TEST_URI}" "${BUILD_DIR}"
git clone "${GIT_PROD_URI}" "${PUBLISH_DIR}"
cd "${PUBLISH_DIR}"
git config user.email "robot-clickhouse@yandex-team.ru"
git config user.name "robot-clickhouse"
git rm -rf *
git commit -a -m "wipe old release"
rm -rf "${BUILD_DIR}/.git"
cp -R "${BUILD_DIR}"/* .
echo -n "clickhouse.tech" > CNAME
git add *
git commit -a -m "add new release at $(date)"
git push origin master
fi
QLOUD_ENDPOINT="https://platform.yandex-team.ru/api/v1"

View File

@ -1,7 +1,9 @@
import contextlib
import multiprocessing
import os
import shutil
import tempfile
import threading
@contextlib.contextmanager
@ -20,3 +22,13 @@ def autoremoved_file(path):
yield handle
finally:
os.unlink(path)
def run_function_in_parallel(func, args_list, threads=False):
processes = []
for task in args_list:
cls = threading.Thread if threads else multiprocessing.Process
processes.append(cls(target=func, args=task))
processes[-1].start()
for process in processes:
process.join()

View File

@ -15,5 +15,16 @@ Join(ANY|ALL, LEFT|INNER, k1[, k2, ...])
跟 Set 引擎类似Join 引擎把数据存储在磁盘中。
### Limitations and Settings {#join-limitations-and-settings}
When creating a table, the following settings are applied:
- join_use_nulls
- max_rows_in_join
- max_bytes_in_join
- join_overflow_mode
- join_any_take_last_row
The `Join`-engine tables can't be used in `GLOBAL JOIN` operations.
[来源文章](https://clickhouse.yandex/docs/en/operations/table_engines/join/) <!--hide-->

View File

@ -8,6 +8,8 @@ Kafka 特性:
- 容错存储机制。
- 处理流数据。
<a name="table_engine-kafka-creating-a-table"></a>
老版格式:
```

View File

@ -46,6 +46,8 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
请求参数的描述,参考 [请求描述](../../query_language/create.md) 。
<a name="mergetree-query-clauses"></a>
**子句**
- `ENGINE` - 引擎名和参数。 `ENGINE = MergeTree()`. `MergeTree` 引擎没有参数。
@ -270,7 +272,7 @@ SELECT count() FROM table WHERE s < 'z'
SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
```
#### 索引的可用类型
#### 索引的可用类型 {#table_engine-mergetree-data_skipping-indexes}
* `minmax`
存储指定表达式的极值(如果表达式是 `tuple` ,则存储 `tuple` 中每个元素的极值),这些信息用于跳过数据块,类似主键。

View File

@ -32,7 +32,7 @@
<a class="menu_item" href="#quick-start">Quick Start</a>
<a class="menu_item" href="#performance">Performance</a>
<a class="menu_item" href="docs/en/">Documentation</a>
<a class="menu_item" href="blog/en/">Blog</a>
<a class="menu_item" href="https://clickhouse.yandex/blog/en/">Blog</a>
<a class="menu_item" href="#contacts">Contacts</a>
</div>
@ -398,7 +398,7 @@
<p>System requirements: Linux, x86_64 with SSE 4.2.</p>
<p>Install packages for <span class="distributive_selected" id="repo_deb">Ubuntu/Debian</span> or <span class="distributive_not_selected" id="repo_rpm">CentOS/RedHat</span> or <span class="distributive_not_selected" id="repo_tgz">Other Linux</span>:</p>
<p>Install packages for <span class="distributive_selected" id="repo_deb">Ubuntu/Debian</span>, <span class="distributive_not_selected" id="repo_rpm">CentOS/RedHat</span> or <span class="distributive_not_selected" id="repo_tgz">other Linux</span>:</p>
<code id="packages-install">
<pre id="instruction_deb">
@ -455,7 +455,7 @@ sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
Yandex Managed Service for ClickHouse</a>.
</p>
<p>After you got connected to your ClickHouse server, you can proceed to <strong><a href="tutorial.html">tutorial</a></strong> or <strong><a href="docs/en/">full
<p>After you got connected to your ClickHouse server, you can proceed to <strong><a href="/docs/en/getting_started/tutorial/">tutorial</a></strong> or <strong><a href="docs/en/">full
documentation</a></strong>.</p>
<h2 id="contacts">Contacts</h2>
@ -567,7 +567,7 @@ sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
});
var hostParts = window.location.host.split('.');
if (hostParts.length > 2 && hostParts[0] != 'test') {
if (hostParts.length > 2 && hostParts[0] != 'test' && hostParts[1] != 'github') {
window.location.host = hostParts[0] + '.' + hostParts[1];
}