Merge branch 'master' into dictionaries-dont-check-is-modified-under-lock

This commit is contained in:
Alexey Milovidov 2019-03-07 02:46:27 +03:00
commit 58bfcba361
66 changed files with 1160 additions and 725 deletions

View File

@ -59,6 +59,12 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
if (NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wextra-semi-stmt -Wshadow-field -Wstring-plus-int")
endif ()
if (NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
if (WEVERYTHING)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-ctad-maybe-unsupported")
endif ()
endif ()
endif ()
if (USE_DEBUG_HELPERS)
@ -200,7 +206,7 @@ target_link_libraries (clickhouse_common_io
PRIVATE
${CMAKE_DL_LIBS}
PUBLIC
roaring
roaring
)

View File

@ -1,5 +1,5 @@
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_functions clickhouse_aggregate_functions daemon)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions daemon)
#set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ...)
clickhouse_program_add(copier)

View File

@ -602,6 +602,8 @@ void HTTPHandler::processQuery(
});
}
customizeContext(context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("Query-Id", current_query_id); });

View File

@ -28,6 +28,9 @@ public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
/// This method is called right before the query execution.
virtual void customizeContext(DB::Context& /* context */) {}
private:
struct Output
{

View File

@ -122,7 +122,7 @@ void TCPHandler::runImpl()
while (1)
{
/// Restore context of request.
/// Set context of request.
query_context = connection_context;
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
@ -158,22 +158,22 @@ void TCPHandler::runImpl()
if (!receivePacket())
continue;
query_scope.emplace(query_context);
query_scope.emplace(*query_context);
send_exception_with_stack_trace = query_context.getSettingsRef().calculate_text_stack_trace;
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client?
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& query_context.getSettingsRef().send_logs_level.value != LogsLevel::none)
&& query_context->getSettingsRef().send_logs_level.value != LogsLevel::none)
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context.getSettingsRef().send_logs_level.toString());
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context->getSettingsRef().send_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue);
}
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context)
query_context->setExternalTablesInitializer([&global_settings, this] (Context & context)
{
if (&context != &query_context)
if (&context != &*query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
/// Get blocks of temporary tables
@ -185,9 +185,11 @@ void TCPHandler::runImpl()
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
});
customizeContext(*query_context);
bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
/// Processing Query
state.io = executeQuery(state.query, query_context, false, state.stage, may_have_embedded_data);
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
if (state.io.out)
state.need_receive_data_for_insert = true;
@ -293,6 +295,9 @@ void TCPHandler::runImpl()
LOG_INFO(log, std::fixed << std::setprecision(3)
<< "Processed in " << watch.elapsedSeconds() << " sec.");
/// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
query_context.reset();
if (network_error)
break;
}
@ -301,7 +306,7 @@ void TCPHandler::runImpl()
void TCPHandler::readData(const Settings & global_settings)
{
const auto receive_timeout = query_context.getSettingsRef().receive_timeout.value;
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
/// Poll interval should not be greater than receive_timeout
const size_t default_poll_interval = global_settings.poll_interval.value * 1000000;
@ -364,8 +369,8 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
/// Send ColumnsDescription for insertion table
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{
const auto & db_and_table = query_context.getInsertionTable();
if (auto * columns = ColumnsDescription::loadFromContext(query_context, db_and_table.first, db_and_table.second))
const auto & db_and_table = query_context->getInsertionTable();
if (auto * columns = ColumnsDescription::loadFromContext(*query_context, db_and_table.first, db_and_table.second))
sendTableColumns(*columns);
}
@ -408,7 +413,7 @@ void TCPHandler::processOrdinaryQuery()
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context.getSettingsRef().interactive_delay)
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
@ -417,7 +422,7 @@ void TCPHandler::processOrdinaryQuery()
sendLogs();
if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000))
if (async_in.poll(query_context->getSettingsRef().interactive_delay / 1000))
{
/// There is the following result block.
block = async_in.read();
@ -645,11 +650,11 @@ void TCPHandler::receiveQuery()
state.is_empty = false;
readStringBinary(state.query_id, *in);
query_context.setCurrentQueryId(state.query_id);
query_context->setCurrentQueryId(state.query_id);
/// Client info
{
ClientInfo & client_info = query_context.getClientInfo();
ClientInfo & client_info = query_context->getClientInfo();
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
client_info.read(*in, client_revision);
@ -677,7 +682,7 @@ void TCPHandler::receiveQuery()
}
/// Per query settings.
Settings & settings = query_context.getSettingsRef();
Settings & settings = query_context->getSettingsRef();
settings.deserialize(*in);
/// Sync timeouts on client and server during current query to avoid dangling queries on server
@ -715,16 +720,16 @@ bool TCPHandler::receiveData()
{
StoragePtr storage;
/// If such a table does not exist, create it.
if (!(storage = query_context.tryGetExternalTable(external_table_name)))
if (!(storage = query_context->tryGetExternalTable(external_table_name)))
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create(external_table_name,
ColumnsDescription{columns, NamesAndTypesList{}, NamesAndTypesList{}, ColumnDefaults{}, ColumnComments{}, ColumnCodecs{}});
storage->startup();
query_context.addExternalTable(external_table_name, storage);
query_context->addExternalTable(external_table_name, storage);
}
/// The data will be written directly to the table.
state.io.out = storage->write(ASTPtr(), query_context);
state.io.out = storage->write(ASTPtr(), *query_context);
}
if (block)
state.io.out->write(block);
@ -763,10 +768,10 @@ void TCPHandler::initBlockOutput(const Block & block)
{
if (!state.maybe_compressed_out)
{
std::string method = query_context.getSettingsRef().network_compression_method;
std::string method = query_context->getSettingsRef().network_compression_method;
std::optional<int> level;
if (method == "ZSTD")
level = query_context.getSettingsRef().network_zstd_compression_level;
level = query_context->getSettingsRef().network_zstd_compression_level;
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
@ -802,7 +807,7 @@ bool TCPHandler::isQueryCancelled()
if (state.is_cancelled || state.sent_all_data)
return true;
if (after_check_cancelled.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay)
if (after_check_cancelled.elapsed() / 1000 < query_context->getSettingsRef().interactive_delay)
return false;
after_check_cancelled.restart();

View File

@ -95,6 +95,9 @@ public:
void run();
/// This method is called right before the query execution.
virtual void customizeContext(DB::Context & /*context*/) {}
private:
IServer & server;
Poco::Logger * log;
@ -106,7 +109,7 @@ private:
UInt64 client_revision = 0;
Context connection_context;
Context query_context;
std::optional<Context> query_context;
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBuffer> in;

View File

@ -4,7 +4,6 @@
<default>
<networks replace="replace">
<ip>::1</ip>
<ip>0.0.0.0</ip>
<ip>127.0.0.1</ip>
</networks>
</default>

View File

@ -157,6 +157,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
else
{
/// shutdown is true, simply finish the thread.
return;
}
}

View File

@ -10,6 +10,7 @@
#include <optional>
#include <ext/singleton.h>
#include <Poco/Event.h>
#include <Common/ThreadStatus.h>
@ -133,18 +134,19 @@ public:
template <typename Function, typename... Args>
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
: state(std::make_shared<Poco::Event>())
{
mutex = std::make_shared<std::mutex>();
/// The function object must be copyable, so we wrap lock_guard in shared_ptr.
/// NOTE: If this will throw an exception, the descructor won't be called.
GlobalThreadPool::instance().scheduleOrThrow([
mutex = mutex,
lock = std::make_shared<std::lock_guard<std::mutex>>(*mutex),
state = state,
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]
{
DB::ThreadStatus thread_status;
std::apply(func, args);
{
DB::ThreadStatus thread_status;
std::apply(func, args);
}
state->set();
});
}
@ -157,7 +159,7 @@ public:
{
if (joinable())
std::terminate();
mutex = std::move(rhs.mutex);
state = std::move(rhs.state);
return *this;
}
@ -171,26 +173,26 @@ public:
{
if (!joinable())
std::terminate();
{
std::lock_guard lock(*mutex);
}
mutex.reset();
state->wait();
state.reset();
}
void detach()
{
if (!joinable())
std::terminate();
mutex.reset();
state.reset();
}
bool joinable() const
{
return static_cast<bool>(mutex);
return state != nullptr;
}
private:
std::shared_ptr<std::mutex> mutex; /// Object must be moveable.
/// The state used in this object and inside the thread job.
std::shared_ptr<Poco::Event> state;
};

View File

@ -1,4 +1,4 @@
#include <atomic>
#include <mutex>
#include <iostream>
#include <Common/ThreadPool.h>
@ -10,8 +10,9 @@ void test()
{
Pool pool(10, 2, 10);
std::mutex mutex;
for (size_t i = 0; i < 10; ++i)
pool.schedule([]{ std::cerr << '.'; });
pool.schedule([&]{ std::lock_guard lock(mutex); std::cerr << '.'; });
pool.wait();
}

View File

@ -23,8 +23,11 @@ GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
for (const auto & pattern : params.patterns)
{
max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData());
max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData());
if (pattern.function)
{
max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData());
max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData());
}
}
place_for_aggregate_state.reset(max_size_of_aggregate_state, max_alignment_of_aggregate_state);
@ -41,13 +44,60 @@ GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
}
const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
Graphite::RollupRule GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
{
for (const auto & pattern : params.patterns)
if (!pattern.regexp || pattern.regexp->match(path.data, path.size))
return &pattern;
const Graphite::Pattern * first_match = &undef_pattern;
return nullptr;
for (const auto & pattern : params.patterns)
{
if (!pattern.regexp)
{
/// Default pattern
if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll)
{
/// There is only default pattern for both retention and aggregation
return std::pair(&pattern, &pattern);
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
else if (pattern.regexp->match(path.data, path.size))
{
/// General pattern with matched path
if (pattern.type == pattern.TypeAll)
{
/// Only for not default patterns with both function and retention parameters
return std::pair(&pattern, &pattern);
}
if (first_match->type == first_match->TypeUndef)
{
first_match = &pattern;
continue;
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
}
return {nullptr, nullptr};
}
@ -142,14 +192,15 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
if (started_rows)
accumulateRow(current_subgroup_newest_row);
const Graphite::Pattern * next_pattern = current_pattern;
Graphite::RollupRule next_rule = current_rule;
if (new_path)
next_pattern = selectPatternForPath(next_path);
next_rule = selectPatternForPath(next_path);
const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule);
time_t next_time_rounded;
if (next_pattern)
if (retention_pattern)
{
UInt32 precision = selectPrecision(next_pattern->retentions, next_row_time);
UInt32 precision = selectPrecision(retention_pattern->retentions, next_row_time);
next_time_rounded = roundTimeToPrecision(date_lut, next_row_time, precision);
}
else
@ -177,7 +228,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
/// At this point previous row has been fully processed, so we can advance the loop
/// (substitute current_* values for next_*, advance the cursor).
startNextGroup(merged_columns, next_cursor, next_pattern);
startNextGroup(merged_columns, next_cursor, next_rule);
++started_rows;
current_time_rounded = next_time_rounded;
@ -229,8 +280,10 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
template <typename TSortCursor>
void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor,
const Graphite::Pattern * next_pattern)
Graphite::RollupRule next_rule)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(next_rule);
/// Copy unmodified column values (including path column).
for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i)
{
@ -238,13 +291,13 @@ void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merge
merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos);
}
if (next_pattern)
if (aggregation_pattern)
{
next_pattern->function->create(place_for_aggregate_state.data());
aggregation_pattern->function->create(place_for_aggregate_state.data());
aggregate_state_created = true;
}
current_pattern = next_pattern;
current_rule = next_rule;
}
@ -255,10 +308,11 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
merged_columns[version_column_num]->insertFrom(
*(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num);
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
{
current_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
current_pattern->function->destroy(place_for_aggregate_state.data());
aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
aggregation_pattern->function->destroy(place_for_aggregate_state.data());
aggregate_state_created = false;
}
else
@ -269,8 +323,9 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
current_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
aggregation_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
}
}

View File

@ -27,11 +27,24 @@ namespace DB
*
* Each row in a table correspond to one value of one sensor.
*
* Pattern should contain function, retention scheme, or both of them. The order of patterns does mean as well:
* * Aggregation OR retention patterns should be first
* * Then aggregation AND retention full patterns have to be placed
* * default pattern without regexp must be the last
*
* Rollup rules are specified in the following way:
*
* pattern
* regexp
* function
* pattern
* regexp
* age -> precision
* age -> precision
* ...
* pattern
* regexp
* function
* age -> precision
* age -> precision
* ...
@ -54,6 +67,10 @@ namespace DB
*
* <graphite_rollup>
* <pattern>
* <regexp>\.max$</regexp>
* <function>max</function>
* </pattern>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
@ -98,9 +115,12 @@ namespace Graphite
std::shared_ptr<OptimizedRegularExpression> regexp;
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
};
using Patterns = std::vector<Pattern>;
using RetentionPattern = Pattern;
using AggregationPattern = Pattern;
struct Params
{
@ -110,6 +130,8 @@ namespace Graphite
String version_column_name;
Graphite::Patterns patterns;
};
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
}
/** Merges several sorted streams into one.
@ -135,7 +157,7 @@ public:
~GraphiteRollupSortedBlockInputStream() override
{
if (aggregate_state_created)
current_pattern->function->destroy(place_for_aggregate_state.data());
std::get<1>(current_rule)->function->destroy(place_for_aggregate_state.data());
}
protected:
@ -186,11 +208,18 @@ private:
time_t current_time = 0;
time_t current_time_rounded = 0;
const Graphite::Pattern * current_pattern = nullptr;
Graphite::RollupRule current_rule = {nullptr, nullptr};
AlignedBuffer place_for_aggregate_state;
bool aggregate_state_created = false; /// Invariant: if true then current_pattern is not NULL.
bool aggregate_state_created = false; /// Invariant: if true then current_rule is not NULL.
const Graphite::Pattern * selectPatternForPath(StringRef path) const;
const Graphite::Pattern undef_pattern =
{ /// temporary empty pattern for selectPatternForPath
nullptr,
nullptr,
DB::Graphite::Retentions(),
undef_pattern.TypeUndef,
};
Graphite::RollupRule selectPatternForPath(StringRef path) const;
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
@ -198,7 +227,7 @@ private:
/// Insert the values into the resulting columns, which will not be changed in the future.
template <typename TSortCursor>
void startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern);
void startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, Graphite::RollupRule next_pattern);
/// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows.
void finishCurrentGroup(MutableColumns & merged_columns);

View File

@ -116,7 +116,7 @@ public:
*/
size_t checkDepth(size_t max_depth) const { return checkDepthImpl(max_depth, max_depth); }
/// Do not allow to change the table while the blocks stream is alive.
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
@ -242,6 +242,10 @@ public:
void enableExtremes() { enabled_extremes = true; }
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
TableStructureReadLocks table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;
@ -268,8 +272,6 @@ protected:
}
private:
TableStructureReadLocks table_locks;
bool enabled_extremes = false;
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.

View File

@ -498,6 +498,15 @@ bool DataTypeArray::equals(const IDataType & rhs) const
}
size_t DataTypeArray::getNumberOfDimensions() const
{
const DataTypeArray * nested_array = typeid_cast<const DataTypeArray *>(nested.get());
if (!nested_array)
return 1;
return 1 + nested_array->getNumberOfDimensions(); /// Every modern C++ compiler optimizes tail recursion.
}
static DataTypePtr create(const ASTPtr & arguments)
{
if (!arguments || arguments->children.size() != 1)

View File

@ -112,6 +112,9 @@ public:
}
const DataTypePtr & getNestedType() const { return nested; }
/// 1 for plain array, 2 for array of arrays and so on.
size_t getNumberOfDimensions() const;
};
}

View File

@ -262,8 +262,10 @@ protected:
/** Text serialization with escaping but without quoting.
*/
public: // used somewhere in arcadia
virtual void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const = 0;
protected:
virtual void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const = 0;
/** Text serialization as a literal that may be inserted into a query.

View File

@ -1,11 +1,11 @@
#pragma once
#include <string>
class IBlockInputStream;
namespace DB
{
class IBlockInputStream;
/// Using in MySQLDictionarySource and XDBCDictionarySource after processing invalidate_query.
std::string readInvalidateQuery(IBlockInputStream & block_input_stream);

View File

@ -3,6 +3,7 @@
#include <Formats/ODBCDriver2BlockOutputStream.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
@ -82,8 +83,10 @@ void ODBCDriver2BlockOutputStream::writePrefix()
writeODBCString(out, "type");
for (size_t i = 0; i < columns; ++i)
{
const ColumnWithTypeAndName & col = header.getByPosition(i);
writeODBCString(out, col.type->getName());
auto type = header.getByPosition(i).type;
if (type->lowCardinality())
type = recursiveRemoveLowCardinality(type);
writeODBCString(out, type->getName());
}
}

View File

@ -238,29 +238,29 @@ using ConstAggregateDataPtr = const char *;
class ProtobufWriter
{
public:
bool writeNumber(Int8 value) { return false; }
bool writeNumber(UInt8 value) { return false; }
bool writeNumber(Int16 value) { return false; }
bool writeNumber(UInt16 value) { return false; }
bool writeNumber(Int32 value) { return false; }
bool writeNumber(UInt32 value) { return false; }
bool writeNumber(Int64 value) { return false; }
bool writeNumber(UInt64 value) { return false; }
bool writeNumber(UInt128 value) { return false; }
bool writeNumber(Float32 value) { return false; }
bool writeNumber(Float64 value) { return false; }
bool writeString(const StringRef & value) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) {}
bool writeEnum(Int8 value) { return false; }
bool writeEnum(Int16 value) { return false; }
bool writeUUID(const UUID & value) { return false; }
bool writeDate(DayNum date) { return false; }
bool writeDateTime(time_t tm) { return false; }
bool writeDecimal(Decimal32 decimal, UInt32 scale) { return false; }
bool writeDecimal(Decimal64 decimal, UInt32 scale) { return false; }
bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return false; }
bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return false; }
bool writeNumber(Int8 /* value */) { return false; }
bool writeNumber(UInt8 /* value */) { return false; }
bool writeNumber(Int16 /* value */) { return false; }
bool writeNumber(UInt16 /* value */) { return false; }
bool writeNumber(Int32 /* value */) { return false; }
bool writeNumber(UInt32 /* value */) { return false; }
bool writeNumber(Int64 /* value */) { return false; }
bool writeNumber(UInt64 /* value */) { return false; }
bool writeNumber(UInt128 /* value */) { return false; }
bool writeNumber(Float32 /* value */) { return false; }
bool writeNumber(Float64 /* value */) { return false; }
bool writeString(const StringRef & /* value */) { return false; }
void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & /* name_value_pairs */) {}
void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & /* name_value_pairs */) {}
bool writeEnum(Int8 /* value */) { return false; }
bool writeEnum(Int16 /* value */) { return false; }
bool writeUUID(const UUID & /* value */) { return false; }
bool writeDate(DayNum /* date */) { return false; }
bool writeDateTime(time_t /* tm */) { return false; }
bool writeDecimal(Decimal32 /* decimal */, UInt32 /* scale */) { return false; }
bool writeDecimal(Decimal64 /* decimal */, UInt32 /* scale */) { return false; }
bool writeDecimal(const Decimal128 & /* decimal */, UInt32 /* scale */) { return false; }
bool writeAggregateFunction(const AggregateFunctionPtr & /* function */, ConstAggregateDataPtr /* place */) { return false; }
};
}

View File

@ -4,6 +4,7 @@
namespace DB
{
class FunctionArrayEnumerateDenseRanked : public FunctionArrayEnumerateRankedExtended<FunctionArrayEnumerateDenseRanked>
{
using Base = FunctionArrayEnumerateRankedExtended<FunctionArrayEnumerateDenseRanked>;

View File

@ -1,61 +1,53 @@
#include <algorithm>
#include <Columns/ColumnConst.h>
#include "arrayEnumerateRanked.h"
namespace DB
{
ArraysDepths getArraysDepths(const ColumnsWithTypeAndName & arguments)
{
const size_t num_arguments = arguments.size();
DepthType clear_depth = 1;
DepthType max_array_depth = 0;
DepthTypes depths;
/// function signature is the following:
/// f(c0, arr1, c1, arr2, c2, ...)
///
/// c0 is something called "clear_depth" here.
/// cN... - how deep to look into the corresponding arrN, (called "depths" here)
/// may be omitted - then it means "look at the full depth".
size_t array_num = 0;
DepthType last_array_depth = 0;
DepthType prev_array_depth = 0;
for (size_t i = 0; i < num_arguments; ++i)
{
const auto type = arguments[i].type;
const DataTypePtr & type = arguments[i].type;
const DataTypeArray * type_array = typeid_cast<const DataTypeArray *>(type.get());
if (isArray(type))
if (type_array)
{
if (depths.size() < array_num && last_array_depth)
if (depths.size() < array_num && prev_array_depth)
{
depths.emplace_back(last_array_depth);
last_array_depth = 0;
depths.emplace_back(prev_array_depth);
prev_array_depth = 0;
}
DepthType depth = 0;
auto sub_type = type;
do
{
auto sub_type_array = typeid_cast<const DataTypeArray *>(sub_type.get());
if (!sub_type_array)
break;
sub_type = sub_type_array->getNestedType();
++depth;
} while (isArray(sub_type));
last_array_depth = depth;
prev_array_depth = type_array->getNumberOfDimensions();
++array_num;
}
if (!arguments[i].column)
continue;
const IColumn * non_const = nullptr;
if (auto const_array_column = typeid_cast<const ColumnConst *>(arguments[i].column.get()))
non_const = const_array_column->getDataColumnPtr().get();
const auto array = typeid_cast<const ColumnArray *>(non_const ? non_const : arguments[i].column.get());
if (!array)
else
{
const auto & depth_column = arguments[i].column;
if (depth_column && depth_column->isColumnConst())
{
auto value = depth_column->getUInt(0);
UInt64 value = static_cast<const ColumnConst &>(*depth_column).getValue<UInt64>();
if (!value)
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: depth ("
+ std::to_string(value) + ") cant be 0.",
throw Exception("Incorrect arguments for function arrayEnumerateUniqRanked or arrayEnumerateDenseRanked: depth ("
+ std::to_string(value) + ") cannot be less or equal 0.",
ErrorCodes::BAD_ARGUMENTS);
if (i == 0)
@ -65,38 +57,35 @@ ArraysDepths getArraysDepths(const ColumnsWithTypeAndName & arguments)
else
{
if (depths.size() >= array_num)
{
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: depth ("
+ std::to_string(value) + ") for missing array.",
throw Exception("Incorrect arguments for function arrayEnumerateUniqRanked or arrayEnumerateDenseRanked: depth ("
+ std::to_string(value) + ") for missing array.",
ErrorCodes::BAD_ARGUMENTS);
}
if (value > prev_array_depth)
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: depth="
+ std::to_string(value) + " for array with depth=" + std::to_string(prev_array_depth) + ".",
ErrorCodes::BAD_ARGUMENTS);
depths.emplace_back(value);
}
}
}
}
if (depths.size() < array_num)
{
depths.emplace_back(last_array_depth);
}
for (auto & depth : depths)
{
if (max_array_depth < depth)
max_array_depth = depth;
}
depths.emplace_back(prev_array_depth);
if (depths.empty())
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: At least one array should be passed.",
throw Exception("Incorrect arguments for function arrayEnumerateUniqRanked or arrayEnumerateDenseRanked: at least one array should be passed.",
ErrorCodes::BAD_ARGUMENTS);
DepthType max_array_depth = 0;
for (auto depth : depths)
max_array_depth = std::max(depth, max_array_depth);
if (clear_depth > max_array_depth)
throw Exception(
"Arguments for function arrayEnumerateUniqRanked/arrayEnumerateDenseRanked incorrect: clear_depth ("
+ std::to_string(clear_depth) + ") cant be larger than max_array_depth (" + std::to_string(max_array_depth) + ").",
throw Exception("Incorrect arguments for function arrayEnumerateUniqRanked or arrayEnumerateDenseRanked: clear_depth ("
+ std::to_string(clear_depth) + ") cant be larger than max_array_depth (" + std::to_string(max_array_depth) + ").",
ErrorCodes::BAD_ARGUMENTS);
return {clear_depth, depths, max_array_depth};

View File

@ -12,6 +12,47 @@
#include <Common/HashTable/ClearableHashMap.h>
/** The function will enumerate distinct values of the passed multidimensional arrays looking inside at the specified depths.
* This is very unusual function made as a special order for Yandex.Metrica.
*
* arrayEnumerateUniqRanked(['hello', 'world', 'hello']) = [1, 1, 2]
* - it returns similar structured array containing number of occurence of the corresponding value.
*
* arrayEnumerateUniqRanked([['hello', 'world'], ['hello'], ['hello']], 1) = [1, 1, 2]
* - look at the depth 1 by default. Elements are ['hello', 'world'], ['hello'], ['hello'].
*
* arrayEnumerateUniqRanked([['hello', 'world'], ['hello'], ['hello']]) = [[1,1],[2],[3]]
* - look at the depth 2. Return similar structured array.
* arrayEnumerateUniqRanked([['hello', 'world'], ['hello'], ['hello']], 2) = [[1,1],[2],[3]]
* - look at the maximum depth by default.
*
* We may pass multiple array arguments. Their elements will be processed as zipped to tuple.
*
* arrayEnumerateUniqRanked(['hello', 'hello', 'world', 'world'], ['a', 'b', 'b', 'b']) = [1, 1, 1, 2]
*
* We may provide arrays of different depths to look at different arguments.
*
* arrayEnumerateUniqRanked([['hello', 'world'], ['hello'], ['world'], ['world']], ['a', 'b', 'b', 'b']) = [[1,1],[1],[1],[2]]
* arrayEnumerateUniqRanked([['hello', 'world'], ['hello'], ['world'], ['world']], 1, ['a', 'b', 'b', 'b'], 1) = [1, 1, 1, 2]
*
* When depths are different, we process less deep arrays as promoted to deeper arrays of similar structure by duplicating elements.
*
* arrayEnumerateUniqRanked(
* [['hello', 'world'], ['hello'], ['world'], ['world']],
* ['a', 'b', 'b', 'b'])
* = arrayEnumerateUniqRanked(
* [['hello', 'world'], ['hello'], ['world'], ['world']],
* [['a', 'a'], ['b'], ['b'], ['b']])
*
* Finally, we can provide extra first argument named "clear_depth" (it can be considered as 1 by default).
* Array elements at the clear_depth will be enumerated as separate elements (enumeration counter is reset for each new element).
*
* SELECT arrayEnumerateUniqRanked(1, [['hello', 'world'], ['hello'], ['world'], ['world']]) = [[1,1],[2],[2],[3]]
* SELECT arrayEnumerateUniqRanked(2, [['hello', 'world'], ['hello'], ['world'], ['world']]) = [[1,1],[1],[1],[1]]
* SELECT arrayEnumerateUniqRanked(1, [['hello', 'world', 'hello'], ['hello'], ['world'], ['world']]) = [[1,1,2],[3],[2],[3]]
* SELECT arrayEnumerateUniqRanked(2, [['hello', 'world', 'hello'], ['hello'], ['world'], ['world']]) = [[1,1,2],[1],[1],[1]]
*/
namespace DB
{
namespace ErrorCodes
@ -27,12 +68,21 @@ class FunctionArrayEnumerateDenseRanked;
using DepthType = uint32_t;
using DepthTypes = std::vector<DepthType>;
struct ArraysDepths
{
/// Enumerate elements at the specified level separately.
DepthType clear_depth;
/// Effective depth is the array depth by default or lower value, specified as a constant argument following the array.
/// f([[1, 2], [3]]) - effective depth is 2.
/// f([[1, 2], [3]], 1) - effective depth is 1.
DepthTypes depths;
/// Maximum effective depth.
DepthType max_array_depth;
};
/// Return depth info about passed arrays
ArraysDepths getArraysDepths(const ColumnsWithTypeAndName & arguments);
@ -55,7 +105,9 @@ public:
+ ", should be at least 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto & arrays_depths = getArraysDepths(arguments);
const ArraysDepths arrays_depths = getArraysDepths(arguments);
/// Return type is the array of the depth as the maximum effective depth of arguments, containing UInt32.
DataTypePtr type = std::make_shared<DataTypeUInt32>();
for (DepthType i = 0; i < arrays_depths.max_array_depth; ++i)
@ -79,15 +131,15 @@ private:
/// Hash a set of keys into a UInt128 value.
static inline UInt128 ALWAYS_INLINE hash128depths(const std::vector<size_t> & indexes, const ColumnRawPtrs & key_columns)
static inline UInt128 ALWAYS_INLINE hash128depths(const std::vector<size_t> & indices, const ColumnRawPtrs & key_columns)
{
UInt128 key;
SipHash hash;
for (size_t j = 0, keys_size = key_columns.size(); j < keys_size; ++j)
{
// Debug: const auto & field = (*key_columns[j])[indexes[j]]; DUMP(j, indexes[j], field);
key_columns[j]->updateHashWithValue(indexes[j], hash);
// Debug: const auto & field = (*key_columns[j])[indices[j]]; DUMP(j, indices[j], field);
key_columns[j]->updateHashWithValue(indices[j], hash);
}
hash.get128(key.low, key.high);
@ -111,9 +163,11 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeImpl(
for (size_t i = 0; i < arguments.size(); ++i)
args.emplace_back(block.getByPosition(arguments[i]));
const auto & arrays_depths = getArraysDepths(args);
const ArraysDepths arrays_depths = getArraysDepths(args);
auto get_array_column = [&](const auto & column) -> const DB::ColumnArray * {
/// If the column is Array - return it. If the const Array - materialize it, keep ownership and return.
auto get_array_column = [&](const auto & column) -> const DB::ColumnArray *
{
const ColumnArray * array = checkAndGetColumn<ColumnArray>(column);
if (!array)
{
@ -146,7 +200,7 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeImpl(
if (*offsets_by_depth[0] != array->getOffsets())
{
throw Exception(
"Lengths and depths of all arrays passed to " + getName() + " must be equal.",
"Lengths and effective depths of all arrays passed to " + getName() + " must be equal.",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
}
@ -170,7 +224,7 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeImpl(
if (*offsets_by_depth[col_depth] != array->getOffsets())
{
throw Exception(
"Lengths and depths of all arrays passed to " + getName() + " must be equal.",
"Lengths and effective depths of all arrays passed to " + getName() + " must be equal.",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
}
@ -180,7 +234,7 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeImpl(
{
throw Exception(
getName() + ": Passed array number " + std::to_string(array_num) + " depth ("
+ std::to_string(arrays_depths.depths[array_num]) + ") more than actual array depth (" + std::to_string(col_depth)
+ std::to_string(arrays_depths.depths[array_num]) + ") is more than the actual array depth (" + std::to_string(col_depth)
+ ").",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
@ -251,6 +305,7 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeMethodImpl(
const ArraysDepths & arrays_depths,
ColumnUInt32::Container & res_values)
{
/// Offsets at the depth we want to look.
const size_t current_offset_depth = arrays_depths.max_array_depth;
const auto & offsets = *offsets_by_depth[current_offset_depth - 1];
@ -264,22 +319,24 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeMethodImpl(
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Map indices;
std::vector<size_t> indexes_by_depth(arrays_depths.max_array_depth);
std::vector<size_t> indices_by_depth(arrays_depths.max_array_depth);
std::vector<size_t> current_offset_n_by_depth(arrays_depths.max_array_depth);
UInt32 rank = 0;
std::vector<size_t> columns_indexes(columns.size());
std::vector<size_t> columns_indices(columns.size());
for (size_t off : offsets)
{
bool want_clear = false;
/// For each element at the depth we want to look.
for (size_t j = prev_off; j < off; ++j)
{
for (size_t col_n = 0; col_n < columns.size(); ++col_n)
columns_indexes[col_n] = indexes_by_depth[arrays_depths.depths[col_n] - 1];
columns_indices[col_n] = indices_by_depth[arrays_depths.depths[col_n] - 1];
auto hash = hash128depths(columns_indexes, columns);
auto hash = hash128depths(columns_indices, columns);
if constexpr (std::is_same_v<Derived, FunctionArrayEnumerateUniqRanked>)
{
@ -297,13 +354,13 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeMethodImpl(
res_values[j] = idx;
}
// Debug: DUMP(off, prev_off, j, columns_indexes, res_values[j], columns);
// Debug: DUMP(off, prev_off, j, columns_indices, res_values[j], columns);
for (int depth = current_offset_depth - 1; depth >= 0; --depth)
{
++indexes_by_depth[depth];
++indices_by_depth[depth];
if (indexes_by_depth[depth] == (*offsets_by_depth[depth])[current_offset_n_by_depth[depth]])
if (indices_by_depth[depth] == (*offsets_by_depth[depth])[current_offset_n_by_depth[depth]])
{
if (static_cast<int>(arrays_depths.clear_depth) == depth + 1)
want_clear = true;
@ -315,6 +372,7 @@ void FunctionArrayEnumerateRankedExtended<Derived>::executeMethodImpl(
}
}
}
if (want_clear)
{
want_clear = false;

View File

@ -1818,6 +1818,19 @@ void Context::addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd)
shared->bridge_commands.emplace_back(std::move(cmd));
}
IHostContextPtr & Context::getHostContext()
{
return host_context;
}
const IHostContextPtr & Context::getHostContext() const
{
return host_context;
}
std::shared_ptr<ActionLocksManager> Context::getActionLocksManager()
{
auto lock = getLock();

View File

@ -99,6 +99,15 @@ using TableAndCreateASTs = std::map<String, TableAndCreateAST>;
/// Callback for external tables initializer
using ExternalTablesInitializer = std::function<void(Context &)>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library.
struct IHostContext
{
virtual ~IHostContext() = default;
};
using IHostContextPtr = std::shared_ptr<IHostContext>;
/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
@ -139,6 +148,12 @@ private:
using DatabasePtr = std::shared_ptr<IDatabase>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
IHostContextPtr host_context; /// Arbitrary object that may used to attach some host specific information to query context,
/// when using ClickHouse as a library in some project. For example, it may contain host
/// logger, some query identification information, profiling guards, etc. This field is
/// to be customized in HTTP and TCP servers by overloading the customizeContext(DB::Context&)
/// methods.
/// Use copy constructor or createGlobal() instead
Context();
@ -452,6 +467,9 @@ public:
/// Add started bridge command. It will be killed after context destruction
void addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd);
IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const;
private:
/** Check if the current client has access to the specified database.
* If access is denied, throw an exception.

View File

@ -192,6 +192,9 @@ void CrossToInnerJoinMatcher::visit(ASTSelectQuery & select, ASTPtr & ast, Data
using CheckExpressionMatcher = OneTypeMatcher<CheckExpressionVisitorData, false>;
using CheckExpressionVisitor = InDepthNodeVisitor<CheckExpressionMatcher, true>;
if (!select.where_expression)
return;
std::vector<DatabaseAndTableWithAlias> table_names;
ASTPtr ast_join = getCrossJoin(select, table_names);
if (!ast_join)
@ -215,10 +218,10 @@ void CrossToInnerJoinMatcher::visit(ASTSelectQuery & select, ASTPtr & ast, Data
select.where_expression.reset();
join.children.push_back(join.on_expression);
ast = ast->clone(); /// rewrite AST in right manner
data.done = true;
}
ast = ast->clone(); /// rewrite AST in right manner
data.done = true;
}
}

View File

@ -62,11 +62,11 @@ bool DatabaseAndTableWithAlias::satisfies(const DatabaseAndTableWithAlias & db_t
return database == db_table.database && table == db_table.table;
}
String DatabaseAndTableWithAlias::getQualifiedNamePrefix() const
String DatabaseAndTableWithAlias::getQualifiedNamePrefix(bool with_dot) const
{
if (alias.empty() && table.empty())
return "";
return (!alias.empty() ? alias : table) + '.';
return (!alias.empty() ? alias : table) + (with_dot ? "." : "");
}
std::vector<const ASTTableExpression *> getSelectTablesExpression(const ASTSelectQuery & select_query)

View File

@ -32,7 +32,7 @@ struct DatabaseAndTableWithAlias
DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database = "");
/// "alias." or "table." if alias is empty
String getQualifiedNamePrefix() const;
String getQualifiedNamePrefix(bool with_dot = true) const;
/// Check if it satisfies another db_table name. @note opterion is not symmetric.
bool satisfies(const DatabaseAndTableWithAlias & table, bool table_may_be_an_alias);

View File

@ -36,9 +36,10 @@ struct ColumnAliasesMatcher
{
const std::vector<DatabaseAndTableWithAlias> tables;
bool public_names;
AsteriskSemantic::RevertedAliases rev_aliases;
std::unordered_map<String, String> aliases;
AsteriskSemantic::RevertedAliases rev_aliases; /// long_name -> aliases
std::unordered_map<String, String> aliases; /// alias -> long_name
std::vector<std::pair<ASTIdentifier *, bool>> compound_identifiers;
std::set<String> allowed_long_names; /// original names allowed as aliases '--t.x as t.x' (select expressions only).
Data(std::vector<DatabaseAndTableWithAlias> && tables_)
: tables(tables_)
@ -51,29 +52,37 @@ struct ColumnAliasesMatcher
for (auto & [identifier, is_public] : compound_identifiers)
{
auto it = rev_aliases.find(identifier->name);
String long_name = identifier->name;
auto it = rev_aliases.find(long_name);
if (it == rev_aliases.end())
{
bool last_table = IdentifierSemantic::canReferColumnToTable(*identifier, tables.back());
if (!last_table)
{
String long_name = identifier->name;
String alias = hide_prefix + long_name;
aliases[alias] = long_name;
rev_aliases[long_name].push_back(alias);
identifier->setShortName(alias);
if (is_public)
{
identifier->setAlias(long_name);
allowed_long_names.insert(long_name);
}
}
else if (is_public)
identifier->setAlias(identifier->name); /// prevent crop long to short name
identifier->setAlias(long_name); /// prevent crop long to short name
}
else
{
if (it->second.empty())
throw Exception("No alias for '" + identifier->name + "'", ErrorCodes::LOGICAL_ERROR);
identifier->setShortName(it->second[0]);
throw Exception("No alias for '" + long_name + "'", ErrorCodes::LOGICAL_ERROR);
if (is_public && allowed_long_names.count(long_name))
; /// leave original name unchanged for correct output
else
identifier->setShortName(it->second[0]);
}
}
}
@ -131,7 +140,7 @@ struct ColumnAliasesMatcher
node.setAlias("");
}
}
else
else if (node.compound())
data.compound_identifiers.emplace_back(&node, data.public_names);
}
};

View File

@ -103,7 +103,7 @@ struct Settings
\
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ") \
\
M(SettingUInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for input/output operations is bypassing the page cache. 0 - disabled.") \
M(SettingUInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for reading the data with O_DIRECT option during SELECT queries execution. 0 - disabled.") \
\
M(SettingBool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.") \
M(SettingBool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.") \

View File

@ -652,7 +652,8 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
{
if (const ASTTablesInSelectQueryElement * node = select_query->join())
{
replaceJoinedTable(node);
if (settings.enable_optimize_predicate_expression)
replaceJoinedTable(node);
const auto & joined_expression = static_cast<const ASTTableExpression &>(*node->table_expression);
DatabaseAndTableWithAlias table(joined_expression, context.getCurrentDatabase());

View File

@ -143,21 +143,14 @@ void TranslateQualifiedNamesMatcher::visit(ASTSelectQuery & select, const ASTPtr
Visitor(data).visit(*add_node);
}
/// qualifed names for duplicates
static std::shared_ptr<ASTIdentifier> makeIdentifier(const String & short_name, const String & long_name, bool need_long_name)
static void addIdentifier(ASTs & nodes, const String & table_name, const String & column_name, AsteriskSemantic::RevertedAliasesPtr aliases)
{
if (need_long_name)
return std::make_shared<ASTIdentifier>(long_name);
return std::make_shared<ASTIdentifier>(short_name);
}
auto identifier = std::make_shared<ASTIdentifier>(std::vector<String>{table_name, column_name});
static void addIdentifier(ASTs & nodes, std::shared_ptr<ASTIdentifier> identifier, const String & long_name,
AsteriskSemantic::RevertedAliasesPtr aliases)
{
bool added = false;
if (aliases && aliases->count(long_name))
if (aliases && aliases->count(identifier->name))
{
for (const String & alias : (*aliases)[long_name])
for (const String & alias : (*aliases)[identifier->name])
{
nodes.push_back(identifier->clone());
nodes.back()->setAlias(alias);
@ -173,7 +166,6 @@ static void addIdentifier(ASTs & nodes, std::shared_ptr<ASTIdentifier> identifie
void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPtr &, Data & data)
{
const auto & tables_with_columns = data.tables;
const auto & source_columns = data.source_columns;
ASTs old_children;
if (data.processAsterisks())
@ -208,16 +200,14 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
if (const auto * asterisk = typeid_cast<const ASTAsterisk *>(child.get()))
{
bool first_table = true;
for (const auto & [table_name, table_columns] : tables_with_columns)
for (const auto & [table, table_columns] : tables_with_columns)
{
for (const auto & column_name : table_columns)
{
if (first_table || !data.join_using_columns.count(column_name))
{
bool need_prefix = !first_table && source_columns.count(column_name);
String long_name = table_name.getQualifiedNamePrefix() + column_name;
auto identifier = makeIdentifier(column_name, long_name, need_prefix);
addIdentifier(node.children, identifier, long_name, AsteriskSemantic::getAliases(*asterisk));
String table_name = table.getQualifiedNamePrefix(false);
addIdentifier(node.children, table_name, column_name, AsteriskSemantic::getAliases(*asterisk));
}
}
@ -228,22 +218,17 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
{
DatabaseAndTableWithAlias ident_db_and_name(qualified_asterisk->children[0]);
bool first_table = true;
for (const auto & [table_name, table_columns] : tables_with_columns)
for (const auto & [table, table_columns] : tables_with_columns)
{
if (ident_db_and_name.satisfies(table_name, true))
if (ident_db_and_name.satisfies(table, true))
{
for (const auto & column_name : table_columns)
{
bool need_prefix = !first_table && source_columns.count(column_name);
String long_name = table_name.getQualifiedNamePrefix() + column_name;
auto identifier = makeIdentifier(column_name, long_name, need_prefix);
addIdentifier(node.children, identifier, long_name, AsteriskSemantic::getAliases(*qualified_asterisk));
String table_name = table.getQualifiedNamePrefix(false);
addIdentifier(node.children, table_name, column_name, AsteriskSemantic::getAliases(*qualified_asterisk));
}
break;
}
first_table = false;
}
}
else

View File

@ -29,6 +29,10 @@ ASTIdentifier::ASTIdentifier(const String & name_, std::vector<String> && name_p
{
}
ASTIdentifier::ASTIdentifier(std::vector<String> && name_parts_)
: ASTIdentifier(name_parts_.at(0) + '.' + name_parts_.at(1), std::move(name_parts_))
{}
void ASTIdentifier::setShortName(const String & new_name)
{
name = new_name;
@ -48,9 +52,8 @@ void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, Form
settings.ostr << (settings.hilite ? hilite_none : "");
};
/// A simple or compound identifier?
if (name_parts.size() > 1)
/// It could be compound but short
if (!isShort())
{
for (size_t i = 0, size = name_parts.size(); i < size; ++i)
{

View File

@ -22,6 +22,7 @@ public:
String name;
ASTIdentifier(const String & name_, std::vector<String> && name_parts_ = {});
ASTIdentifier(std::vector<String> && name_parts_);
/** Get the text that identifies this element. */
String getID(char delim) const override { return "Identifier" + (delim + name); }

View File

@ -11,6 +11,7 @@
#include <Storages/MergeTree/MarkRange.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <DataTypes/DataTypeLowCardinality.h>
constexpr auto INDEX_FILE_PREFIX = "skp_idx_";

View File

@ -18,7 +18,7 @@ namespace ErrorCodes
}
/// 0b11 -- can be true and false at the same time
const Field UNKNOWN_FIELD(3);
const Field UNKNOWN_FIELD(3u);
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index)
@ -47,7 +47,16 @@ void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
for (size_t i = 0; i < index.columns.size(); ++i)
{
const auto & type = index.data_types[i];
type->serializeBinaryBulk(*columns[i], ostr, 0, size());
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](IDataType::SubstreamPath) -> WriteBuffer * { return &ostr; };
settings.position_independent_encoding = false;
settings.low_cardinality_max_dictionary_size = 0;
IDataType::SerializeBinaryBulkStatePtr state;
type->serializeBinaryBulkStatePrefix(settings, state);
type->serializeBinaryBulkWithMultipleStreams(*columns[i], 0, size(), settings, state);
type->serializeBinaryBulkStateSuffix(settings, state);
}
}
@ -66,11 +75,21 @@ void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
size_type->deserializeBinary(field_rows, istr);
size_t rows_to_read = field_rows.get<size_t>();
if (rows_to_read == 0)
return;
for (size_t i = 0; i < index.columns.size(); ++i)
{
const auto & type = index.data_types[i];
auto new_column = type->createColumn();
type->deserializeBinaryBulk(*new_column, istr, rows_to_read, 0);
IDataType::DeserializeBinaryBulkSettings settings;
settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.position_independent_encoding = false;
IDataType::DeserializeBinaryBulkStatePtr state;
type->deserializeBinaryBulkStatePrefix(settings, state);
type->deserializeBinaryBulkWithMultipleStreams(*new_column, rows_to_read, settings, state);
block.insert(ColumnWithTypeAndName(new_column->getPtr(), type, index.columns[i]));
}
@ -177,10 +196,24 @@ bool SetIndexCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule)
Block result = granule->getElementsBlock();
actions->execute(result);
const auto & column = result.getByName(expression_ast->getColumnName()).column;
auto column = result.getByName(expression_ast->getColumnName()).column->convertToFullColumnIfLowCardinality();
auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
const NullMap * null_map = nullptr;
if (auto * col_nullable = typeid_cast<const ColumnNullable *>(column.get()))
{
col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
null_map = &col_nullable->getNullMapData();
}
if (!col_uint8)
throw Exception("ColumnUInt8 expected as Set index condition result.", ErrorCodes::LOGICAL_ERROR);
auto & condition = col_uint8->getData();
for (size_t i = 0; i < column->size(); ++i)
if (column->getInt(i) & 1)
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
return true;
return false;

View File

@ -126,17 +126,32 @@ static void appendGraphitePattern(
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!pattern.function)
throw Exception("Aggregate function is mandatory for retention patterns in GraphiteMergeTree",
if (!pattern.function && pattern.retentions.empty())
throw Exception("At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (pattern.function->allocatesMemoryInArena())
throw Exception("Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree",
ErrorCodes::NOT_IMPLEMENTED);
if (!pattern.function)
{
pattern.type = pattern.TypeRetention;
}
else if (pattern.retentions.empty())
{
pattern.type = pattern.TypeAggregation;
}
else
{
pattern.type = pattern.TypeAll;
}
if (pattern.type & pattern.TypeAggregation) /// TypeAggregation or TypeAll
if (pattern.function->allocatesMemoryInArena())
throw Exception("Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree",
ErrorCodes::NOT_IMPLEMENTED);
/// retention should be in descending order of age.
std::sort(pattern.retentions.begin(), pattern.retentions.end(),
[] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; });
if (pattern.type & pattern.TypeRetention) /// TypeRetention or TypeAll
std::sort(pattern.retentions.begin(), pattern.retentions.end(),
[] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; });
patterns.emplace_back(pattern);
}

View File

@ -148,13 +148,26 @@ void StorageSystemGraphite::fillData(MutableColumns & res_columns, const Context
const auto patterns = readPatterns(config, section);
for (const auto & pattern : patterns)
{
for (const auto & ret : pattern.retentions)
if (!pattern.retentions.empty())
{
for (const auto & ret : pattern.retentions)
{
res_columns[0]->insert(section);
res_columns[1]->insert(pattern.regexp);
res_columns[2]->insert(pattern.function);
res_columns[3]->insert(ret.age);
res_columns[4]->insert(ret.precision);
res_columns[5]->insert(pattern.priority);
res_columns[6]->insert(pattern.is_default);
}
}
else
{
res_columns[0]->insert(section);
res_columns[1]->insert(pattern.regexp);
res_columns[2]->insert(pattern.function);
res_columns[3]->insert(ret.age);
res_columns[4]->insert(ret.precision);
res_columns[3]->insert(0);
res_columns[4]->insert(0);
res_columns[5]->insert(pattern.priority);
res_columns[6]->insert(pattern.is_default);
}

View File

@ -5,6 +5,29 @@
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<pattern>
<regexp>^five_min\.</regexp>
<retention>
<age>0</age>
<precision>300</precision>
</retention>
<retention>
<age>5184000</age>
<precision>3600</precision>
</retention>
<retention>
<age>31536000</age>
<precision>14400</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_min</regexp>
<function>avg</function>
@ -22,4 +45,53 @@
</retention>
</pattern>
</graphite_rollup>
<graphite_rollup_with_default>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<default>
<function>any</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</default>
</graphite_rollup_with_default>
<graphite_rollup_broken>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<default>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</default>
</graphite_rollup_broken>
</yandex>

View File

@ -8,31 +8,38 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/graphite_rollup.xml'])
instance = cluster.add_instance('instance',
main_configs=['configs/graphite_rollup.xml'])
q = instance.query
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query('CREATE DATABASE test')
q('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture
def graphite_table(started_cluster):
instance.query('''
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=8192;
''')
yield
instance.query('DROP TABLE test.graphite')
q('DROP TABLE test.graphite')
def test_rollup_versions(graphite_table):
@ -40,13 +47,14 @@ def test_rollup_versions(graphite_table):
rounded_timestamp = timestamp - timestamp % 60
date = datetime.date.today().isoformat()
q = instance.query
# Insert rows with timestamps relative to the current time so that the first retention clause is active.
# Insert rows with timestamps relative to the current time so that the
# first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
@ -54,7 +62,9 @@ one_min.x1 100 {timestamp} {date} 1
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite ORDER BY updated')) == TSV(expected1)
assert TSV(
q('SELECT * FROM test.graphite ORDER BY updated')
) == TSV(expected1)
q('OPTIMIZE TABLE test.graphite')
@ -67,8 +77,6 @@ one_min.x1 200 {timestamp} {date} 2
def test_rollup_aggregation(graphite_table):
q = instance.query
# This query essentially emulates what rollup does.
result1 = q('''
SELECT avg(v), max(upd)
@ -91,7 +99,8 @@ FROM (SELECT timestamp,
'''
assert TSV(result1) == TSV(expected1)
# Timestamp 1111111111 is in sufficiently distant past so that the last retention clause is active.
# Timestamp 1111111111 is in sufficiently distant past
# so that the last retention clause is active.
result2 = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
@ -114,7 +123,7 @@ one_min.x 999634.9918367347 1111444200 2017-02-02 499999
def test_rollup_aggregation_2(graphite_table):
result = instance.query('''
result = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
@ -136,7 +145,7 @@ one_min.x 24 1111110600 2017-02-02 100
def test_multiple_paths_and_versions(graphite_table):
result = instance.query('''
result = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
@ -163,7 +172,9 @@ OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
with open(p.join(p.dirname(__file__), 'test_multiple_paths_and_versions.reference')) as reference:
with open(p.join(p.dirname(__file__),
'test_multiple_paths_and_versions.reference')
) as reference:
assert TSV(result) == TSV(reference)
@ -177,14 +188,18 @@ def test_multiple_output_blocks(graphite_table):
for j in range(3):
cur_time = rolled_up_time + 100 * j
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(10 * j, cur_time)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(10 * (j + 1), cur_time)
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(
10 * j, cur_time
)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(
10 * (j + 1), cur_time
)
expected += 'one_min.x1 20 {} 2001-09-09 2\n'.format(rolled_up_time)
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
result = instance.query('''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
@ -200,14 +215,14 @@ zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 999999600 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = instance.query('''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
@ -215,27 +230,171 @@ SELECT * FROM test.graphite;
assert TSV(result) == TSV(expected)
def test_path_dangling_pointer(graphite_table):
instance.query('''
q('''
DROP TABLE IF EXISTS test.graphite2;
CREATE TABLE test.graphite2
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree(date, (metric, timestamp), 1, 'graphite_rollup');
''')
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
path = 'abcd' * 4000000 # 16MB
instance.query('INSERT INTO test.graphite2 FORMAT TSV', "{}\t0.0\t0\t2018-01-01\t100\n".format(path))
instance.query('INSERT INTO test.graphite2 FORMAT TSV', "{}\t0.0\t0\t2018-01-01\t101\n".format(path))
path = 'abcd' * 4000000 # 16MB
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t100\n".format(path))
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t101\n".format(path))
for version in range(10):
instance.query('INSERT INTO test.graphite2 FORMAT TSV', "{}\t0.0\t0\t2018-01-01\t{}\n".format(path, version))
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t{}\n".format(path, version))
while True:
instance.query('OPTIMIZE TABLE test.graphite2 PARTITION 201801 FINAL')
parts = int(instance.query("SELECT count() FROM system.parts WHERE active AND database='test' AND table='graphite2'"))
if parts == 1:
break
print "Parts", parts
q('OPTIMIZE TABLE test.graphite2 PARTITION 201801 FINAL')
parts = int(q("SELECT count() FROM system.parts "
"WHERE active AND database='test' "
"AND table='graphite2'"))
if parts == 1:
break
print('Parts', parts)
assert TSV(instance.query("SELECT value, timestamp, date, updated FROM test.graphite2")) == TSV("0\t0\t2018-01-01\t101\n")
assert TSV(
q("SELECT value, timestamp, date, updated FROM test.graphite2")
) == TSV("0\t0\t2018-01-01\t101\n")
instance.query('DROP TABLE test.graphite2')
q('DROP TABLE test.graphite2')
def test_combined_rules(graphite_table):
# 1487970000 ~ Sat 25 Feb 00:00:00 MSK 2017
to_insert = 'INSERT INTO test.graphite VALUES '
expected_unmerged = ''
for i in range(384):
to_insert += "('five_min.count', {v}, {t}, toDate({t}), 1), ".format(
v=1, t=1487970000+(i*300)
)
to_insert += "('five_min.max', {v}, {t}, toDate({t}), 1), ".format(
v=i, t=1487970000+(i*300)
)
expected_unmerged += ("five_min.count\t{v1}\t{t}\n"
"five_min.max\t{v2}\t{t}\n").format(
v1=1, v2=i,
t=1487970000+(i*300)
)
q(to_insert)
assert TSV(q('SELECT metric, value, timestamp FROM test.graphite'
' ORDER BY (timestamp, metric)')) == TSV(expected_unmerged)
q('OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL')
expected_merged = '''
five_min.count 48 1487970000 2017-02-25 1
five_min.count 48 1487984400 2017-02-25 1
five_min.count 48 1487998800 2017-02-25 1
five_min.count 48 1488013200 2017-02-25 1
five_min.count 48 1488027600 2017-02-25 1
five_min.count 48 1488042000 2017-02-25 1
five_min.count 48 1488056400 2017-02-26 1
five_min.count 48 1488070800 2017-02-26 1
five_min.max 47 1487970000 2017-02-25 1
five_min.max 95 1487984400 2017-02-25 1
five_min.max 143 1487998800 2017-02-25 1
five_min.max 191 1488013200 2017-02-25 1
five_min.max 239 1488027600 2017-02-25 1
five_min.max 287 1488042000 2017-02-25 1
five_min.max 335 1488056400 2017-02-26 1
five_min.max 383 1488070800 2017-02-26 1
'''
assert TSV(q('SELECT * FROM test.graphite'
' ORDER BY (metric, timestamp)')) == TSV(expected_merged)
def test_combined_rules_with_default(graphite_table):
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup_with_default')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
# 1487970000 ~ Sat 25 Feb 00:00:00 MSK 2017
to_insert = 'INSERT INTO test.graphite VALUES '
expected_unmerged = ''
for i in range(100):
to_insert += "('top_level.count', {v}, {t}, toDate({t}), 1), ".format(
v=1, t=1487970000+(i*60)
)
to_insert += "('top_level.max', {v}, {t}, toDate({t}), 1), ".format(
v=i, t=1487970000+(i*60)
)
expected_unmerged += ("top_level.count\t{v1}\t{t}\n"
"top_level.max\t{v2}\t{t}\n").format(
v1=1, v2=i,
t=1487970000+(i*60)
)
q(to_insert)
assert TSV(q('SELECT metric, value, timestamp FROM test.graphite'
' ORDER BY (timestamp, metric)')) == TSV(expected_unmerged)
q('OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL')
expected_merged = '''
top_level.count 10 1487970000 2017-02-25 1
top_level.count 10 1487970600 2017-02-25 1
top_level.count 10 1487971200 2017-02-25 1
top_level.count 10 1487971800 2017-02-25 1
top_level.count 10 1487972400 2017-02-25 1
top_level.count 10 1487973000 2017-02-25 1
top_level.count 10 1487973600 2017-02-25 1
top_level.count 10 1487974200 2017-02-25 1
top_level.count 10 1487974800 2017-02-25 1
top_level.count 10 1487975400 2017-02-25 1
top_level.max 9 1487970000 2017-02-25 1
top_level.max 19 1487970600 2017-02-25 1
top_level.max 29 1487971200 2017-02-25 1
top_level.max 39 1487971800 2017-02-25 1
top_level.max 49 1487972400 2017-02-25 1
top_level.max 59 1487973000 2017-02-25 1
top_level.max 69 1487973600 2017-02-25 1
top_level.max 79 1487974200 2017-02-25 1
top_level.max 89 1487974800 2017-02-25 1
top_level.max 99 1487975400 2017-02-25 1
'''
assert TSV(q('SELECT * FROM test.graphite'
' ORDER BY (metric, timestamp)')) == TSV(expected_merged)
def test_broken_partial_rollup(graphite_table):
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup_broken')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
to_insert = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)

View File

@ -20,7 +20,7 @@ SELECT \n a, \n b\nFROM \n(\n SELECT \n toUInt64(sum(id) AS b) A
3 3
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n name, \n value, \n min(id) AS id\n FROM test.test \n GROUP BY \n date, \n name, \n value\n HAVING id = 1\n) \nWHERE id = 1
2000-01-01 1 test string 1 1
SELECT \n a, \n b\nFROM \n(\n SELECT \n toUInt64(sum(id) AS b) AS a, \n b\n FROM test.test AS table_alias \n HAVING b = 3\n) AS outer_table_alias \nWHERE outer_table_alias.b = 3
SELECT \n a, \n b\nFROM \n(\n SELECT \n toUInt64(sum(id) AS b) AS a, \n b\n FROM test.test AS table_alias \n HAVING b = 3\n) AS outer_table_alias \nWHERE b = 3
3 3
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n) \nWHERE id = 1
2000-01-01 1 test string 1 1
@ -32,9 +32,9 @@ SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n
2000-01-01 1 test string 1 1
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM \n (\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n ) \n WHERE id = 1\n) \nWHERE id = 1
2000-01-01 1 test string 1 1
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n) AS b \nWHERE b.id = 1
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n) AS b \nWHERE id = 1
2000-01-01 1 test string 1 1
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM \n (\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n ) AS a \n WHERE id = 1\n) AS b \nWHERE b.id = 1
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM \n (\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n ) AS a \n WHERE id = 1\n) AS b \nWHERE id = 1
2000-01-01 1 test string 1 1
SELECT \n id, \n date, \n value\nFROM \n(\n SELECT \n id, \n date, \n min(value) AS value\n FROM test.test \n WHERE id = 1\n GROUP BY \n id, \n date\n) \nWHERE id = 1
1 2000-01-01 1
@ -45,11 +45,11 @@ SELECT \n date, \n id, \n name, \n value, \n date, \n name, \n
2000-01-01 1 test string 1 1 2000-01-01 test string 1 1
SELECT \n id, \n date, \n name, \n value\nFROM \n(\n SELECT toInt8(1) AS id\n) \nANY LEFT JOIN test.test USING (id)\nWHERE value = 1
1 2000-01-01 test string 1 1
SELECT b.value\nFROM \n(\n SELECT toInt8(1) AS id\n) \nANY LEFT JOIN test.test AS b USING (id)\nWHERE value = 1
SELECT value\nFROM \n(\n SELECT toInt8(1) AS id\n) \nANY LEFT JOIN test.test AS b USING (id)\nWHERE value = 1
1
SELECT \n date, \n id, \n name, \n value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value, \n date, \n name, \n value\n FROM \n (\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n ) \n ANY LEFT JOIN \n (\n SELECT *\n FROM test.test \n WHERE id = 1\n ) USING (id)\n WHERE id = 1\n) \nWHERE id = 1
2000-01-01 1 test string 1 1
SELECT \n date, \n id, \n name, \n value, \n `b.date`, \n `b.name`, \n `b.value`\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n) \nANY LEFT JOIN \n(\n SELECT *\n FROM test.test \n WHERE id = 1\n) AS b USING (id)\nWHERE b.id = 1
SELECT \n date, \n id, \n name, \n value, \n b.date, \n b.name, \n b.value\nFROM \n(\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n) \nANY LEFT JOIN \n(\n SELECT *\n FROM test.test \n WHERE id = 1\n) AS b USING (id)\nWHERE b.id = 1
2000-01-01 1 test string 1 1 2000-01-01 test string 1 1
SELECT \n id, \n date, \n name, \n value\nFROM \n(\n SELECT \n toInt8(1) AS id, \n toDate(\'2000-01-01\') AS date\n FROM system.numbers \n LIMIT 1\n) \nANY LEFT JOIN \n(\n SELECT *\n FROM test.test \n WHERE date = toDate(\'2000-01-01\')\n) AS b USING (date, id)\nWHERE b.date = toDate(\'2000-01-01\')
1 2000-01-01 test string 1 1

View File

@ -1,3 +1,5 @@
0 0
0 0
cross
1 1 1 1
1 1 1 2
@ -67,7 +69,7 @@ Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n Expression
Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n ExpressionList (children 1)\n SelectQuery (children 2)\n ExpressionList (children 1)\n Asterisk\n TablesInSelectQuery (children 2)\n TablesInSelectQueryElement (children 1)\n TableExpression (children 1)\n Identifier t1 (alias x)\n TablesInSelectQueryElement (children 2)\n TableJoin (children 1)\n Function and (children 1)\n ExpressionList (children 2)\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier x.a\n Identifier y.a\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier x.b\n Identifier y.b\n TableExpression (children 1)\n Identifier t1 (alias y)\n
cross one table expr
Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n ExpressionList (children 1)\n SelectQuery (children 3)\n ExpressionList (children 1)\n Asterisk\n TablesInSelectQuery (children 2)\n TablesInSelectQueryElement (children 1)\n TableExpression (children 1)\n Identifier t1\n TablesInSelectQueryElement (children 2)\n TableExpression (children 1)\n Identifier t2\n TableJoin\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.a\n Identifier t1.b\n
Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n ExpressionList (children 1)\n SelectQuery (children 3)\n ExpressionList (children 1)\n Asterisk\n TablesInSelectQuery (children 2)\n TablesInSelectQueryElement (children 1)\n TableExpression (children 1)\n Identifier t1\n TablesInSelectQueryElement (children 2)\n TableJoin\n TableExpression (children 1)\n Identifier t2\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.a\n Identifier t1.b\n
Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n ExpressionList (children 1)\n SelectQuery (children 3)\n ExpressionList (children 1)\n Asterisk\n TablesInSelectQuery (children 2)\n TablesInSelectQueryElement (children 1)\n TableExpression (children 1)\n Identifier t1\n TablesInSelectQueryElement (children 2)\n TableExpression (children 1)\n Identifier t2\n TableJoin\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.a\n Identifier t1.b\n
cross multiple ands
Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n ExpressionList (children 1)\n SelectQuery (children 3)\n ExpressionList (children 1)\n Asterisk\n TablesInSelectQuery (children 2)\n TablesInSelectQueryElement (children 1)\n TableExpression (children 1)\n Identifier t1\n TablesInSelectQueryElement (children 2)\n TableExpression (children 1)\n Identifier t2\n TableJoin\n Function and (children 1)\n ExpressionList (children 2)\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.a\n Identifier t2.a\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.b\n Identifier t2.b\n
Explain ParsedAST (children 1)\n SelectWithUnionQuery (children 1)\n ExpressionList (children 1)\n SelectQuery (children 2)\n ExpressionList (children 1)\n Asterisk\n TablesInSelectQuery (children 2)\n TablesInSelectQueryElement (children 1)\n TableExpression (children 1)\n Identifier t1\n TablesInSelectQueryElement (children 2)\n TableJoin (children 1)\n Function and (children 1)\n ExpressionList (children 2)\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.a\n Identifier t2.a\n Function equals (children 1)\n ExpressionList (children 2)\n Identifier t1.b\n Identifier t2.b\n TableExpression (children 1)\n Identifier t2\n

View File

@ -1,6 +1,11 @@
SET enable_debug_queries = 1;
USE test;
set allow_experimental_cross_to_join_conversion = 0;
select * from system.one cross join system.one;
set allow_experimental_cross_to_join_conversion = 1;
select * from system.one cross join system.one;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -0,0 +1,45 @@
Row 1:
──────
t.a: 1
s.b: 1
s.a: 1
s.b: 1
y.a: 1
y.b: 1
Row 2:
──────
t.a: 2
s.b: 0
s.a: 0
s.b: 0
y.a: 0
y.b: 0
┌─t.a─┬─s.b─┬─s.a─┬─s.b─┬─y.a─┬─y.b─┐
│ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │
│ 2 │ 0 │ 0 │ 0 │ 0 │ 0 │
└─────┴─────┴─────┴─────┴─────┴─────┘
┌─t_a─┐
│ 1 │
│ 2 │
└─────┘
┌─t.a─┬─s_a─┐
│ 1 │ 1 │
│ 2 │ 0 │
└─────┴─────┘
┌─t.a─┬─t.a─┬─t_b─┐
│ 1 │ 1 │ 1 │
│ 2 │ 2 │ 2 │
└─────┴─────┴─────┘
┌─s.a─┬─s.a─┬─s_b─┬─s_b─┐
│ 1 │ 1 │ 1 │ 1 │
│ 0 │ 0 │ 0 │ 0 │
└─────┴─────┴─────┴─────┘
┌─y.a─┬─y.a─┬─y_b─┬─y_b─┐
│ 1 │ 1 │ 1 │ 1 │
│ 0 │ 0 │ 0 │ 0 │
└─────┴─────┴─────┴─────┘
┌─t_a─┬─t_a─┬─s_a─┬─s_a─┬─y_a─┬─y_a─┐
│ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │
│ 2 │ 2 │ 0 │ 0 │ 0 │ 0 │
└─────┴─────┴─────┴─────┴─────┴─────┘

View File

@ -0,0 +1,48 @@
use test;
drop table if exists t;
drop table if exists s;
drop table if exists y;
create table t(a Int64, b Int64) engine = TinyLog;
create table s(a Int64, b Int64) engine = TinyLog;
create table y(a Int64, b Int64) engine = TinyLog;
insert into t values (1,1), (2,2);
insert into s values (1,1);
insert into y values (1,1);
select t.a, s.b, s.a, s.b, y.a, y.b from t
left join s on (t.a = s.a and t.b = s.b)
left join y on (y.a = s.a and y.b = s.b) format Vertical;
select t.a, s.b, s.a, s.b, y.a, y.b from t
left join s on (t.a = s.a and s.b = t.b)
left join y on (y.a = s.a and y.b = s.b) format PrettyCompactNoEscapes;
select t.a as t_a from t
left join s on s.a = t_a format PrettyCompactNoEscapes;
select t.a, s.a as s_a from t
left join s on s.a = t.a
left join y on y.b = s.b format PrettyCompactNoEscapes;
select t.a, t.a, t.b as t_b from t
left join s on t.a = s.a
left join y on y.b = s.b format PrettyCompactNoEscapes;
select s.a, s.a, s.b as s_b, s.b from t
left join s on s.a = t.a
left join y on s.b = y.b format PrettyCompactNoEscapes;
select y.a, y.a, y.b as y_b, y.b from t
left join s on s.a = t.a
left join y on y.b = s.b format PrettyCompactNoEscapes;
select t.a, t.a as t_a, s.a, s.a as s_a, y.a, y.a as y_a from t
left join s on t.a = s.a
left join y on y.b = s.b format PrettyCompactNoEscapes;
drop table t;
drop table s;
drop table y;

View File

@ -0,0 +1,30 @@
1 a
-
2 b
-
--
1 a
-
2 b
-
--
1 a
-
2 b
-
----
1 a
-
2 b
-
--
1 a
-
2 b
-
--
1 a
-
2 b
-
----

View File

@ -0,0 +1,69 @@
SET allow_experimental_data_skipping_indices=1;
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b Nullable(String), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b Nullable(String), INDEX b_index b TYPE set(1) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b Nullable(String), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a'), (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '----';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b LowCardinality(Nullable(String)), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b LowCardinality(Nullable(String)), INDEX b_index b TYPE set(1) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b LowCardinality(Nullable(String)), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a'), (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '----';
drop table if exists test.nullable_set_index;

View File

@ -0,0 +1,20 @@
SET allow_experimental_data_skipping_indices=1;
drop table if exists test.null_lc_set_index;
CREATE TABLE test.null_lc_set_index (
timestamp DateTime,
action LowCardinality(Nullable(String)),
user LowCardinality(Nullable(String)),
INDEX test_user_idx (user) TYPE set(0) GRANULARITY 8192
) ENGINE=MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, action, cityHash64(user))
SAMPLE BY cityHash64(user);
INSERT INTO test.null_lc_set_index VALUES (1550883010, 'subscribe', 'alice');
INSERT INTO test.null_lc_set_index VALUES (1550883020, 'follow', 'bob');
SELECT action, user FROM test.null_lc_set_index WHERE user = 'alice';
drop table if exists test.null_lc_set_index;

View File

@ -145,11 +145,11 @@ SELECT arrayEnumerateUniqRanked(); -- { serverError 42 }
SELECT arrayEnumerateUniqRanked([]);
SELECT arrayEnumerateUniqRanked(1); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(2,[]); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(2,[],2); -- { serverError 190 }
SELECT arrayEnumerateUniqRanked(2,[],2); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(2,[],[]); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(2,[],[],3); -- { serverError 190 }
SELECT arrayEnumerateUniqRanked([],2); -- { serverError 190 }
SELECT arrayEnumerateUniqRanked([],2,[]); -- { serverError 190 }
SELECT arrayEnumerateUniqRanked(2,[],[],3); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked([],2); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked([],2,[]); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(0,[],0); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(0,0,0); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked(1,1,1); -- { serverError 36 }
@ -170,5 +170,13 @@ SELECT arrayEnumerateUniqRanked([1,2], 1, 2); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked([1,2], 1, 3, 4, 5); -- { serverError 36 }
SELECT arrayEnumerateUniqRanked([1,2], 1, 3, [4], 5); -- { serverError 36 }
SELECT arrayEnumerateDenseRanked([[[[[[[[[[42]]]]]]]]]]);
SELECT arrayEnumerateUniqRanked('wat', [1,2]); -- { serverError 48 }
SELECT arrayEnumerateUniqRanked(1, [1,2], 'boom'); -- { serverError 48 }
SELECT arrayEnumerateUniqRanked('wat', [1,2]); -- { serverError 170 }
SELECT arrayEnumerateUniqRanked(1, [1,2], 'boom'); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked(['\0'], -8363126); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked(-10, ['\0'], -8363126); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked(1, ['\0'], -8363126); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked(-101, ['\0']); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked(1.1, [10,20,10,30]); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked([10,20,10,30], 0.4); -- { serverError 170 }
SELECT arrayEnumerateDenseRanked([10,20,10,30], 1.8); -- { serverError 170 }
SELECT arrayEnumerateUniqRanked(1, [], 1000000000); -- { serverError 36 }

View File

@ -10,20 +10,37 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.cannot_kill_query"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test.cannot_kill_query (x UInt64) ENGINE = MergeTree ORDER BY x" &> /dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO test.cannot_kill_query SELECT * FROM numbers(10000000)" &> /dev/null
# This SELECT query will run for a long time. It's used as bloker for ALTER query. It will be killed with SYNC kill.
query_for_pending="SELECT count() FROM test.cannot_kill_query WHERE NOT ignore(sleep(1)) SETTINGS max_threads=1, max_block_size=1"
$CLICKHOUSE_CLIENT -q "$query_for_pending" &>/dev/null &
sleep 1 # queries should be in strict order
# This ALTER query will wait until $query_for_pending finished. Also it will block $query_to_kill.
$CLICKHOUSE_CLIENT -q "ALTER TABLE test.cannot_kill_query MODIFY COLUMN x UInt64" &>/dev/null &
sleep 1
# This SELECT query will also run for a long time. Also it's blocked by ALTER query. It will be killed with ASYNC kill.
# This is main idea which we check -- blocked queries can be killed with ASYNC kill.
query_to_kill="SELECT sum(1) FROM test.cannot_kill_query WHERE NOT ignore(sleep(1)) SETTINGS max_threads=1"
$CLICKHOUSE_CLIENT -q "$query_to_kill" &>/dev/null &
sleep 3 # just to be sure that 'KILL ...' will be executed after 'SELECT ... WHERE NOT ignore(sleep(1))'
sleep 1 # just to be sure that kill of $query_to_kill will be executed after $query_to_kill.
timeout 15 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill' SYNC" &>/dev/null
# Kill $query_to_kill with ASYNC kill. We will check that information about KILL is not lost.
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill' ASYNC" &>/dev/null
sleep 1
# Kill $query_for_pending SYNC. This query is not blocker, so it should be killed fast.
timeout 5 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending' SYNC" &>/dev/null
# But let's sleep a little time, just to be sure
sleep 3
# Both queries have to be killed, doesn't matter with SYNC or ASYNC kill
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query='$query_for_pending'"
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query='$query_to_kill'"
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending'" &>/dev/null & # kill pending query
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.cannot_kill_query" &>/dev/null

2
debian/changelog.in vendored
View File

@ -1,4 +1,4 @@
clickhouse (2:@VERSION_STRING@) unstable; urgency=low
clickhouse (@VERSION_STRING@) unstable; urgency=low
* Modified source code

View File

@ -32,6 +32,7 @@
- [RClickhouse](https://github.com/IMSMWU/RClickhouse)
- Java
- [clickhouse-client-java](https://github.com/VirtusAI/clickhouse-client-java)
- [clickhouse-client](https://github.com/Ecwid/clickhouse-client)
- Scala
- [clickhouse-scala-client](https://github.com/crobox/clickhouse-scala-client)
- Kotlin

View File

@ -31,5 +31,4 @@ For more information about queries related to partition manipulations, see the [
A third-party tool is available to automate this approach: [clickhouse-backup](https://github.com/AlexAkulov/clickhouse-backup).
[Original article](https://clickhouse.yandex/docs/en/operations/backup/) <!--hide-->

View File

@ -196,7 +196,7 @@ For more details, see [GraphiteMergeTree](../../operations/table_engines/graphit
The port for connecting to the server over HTTP(s).
If `https_port` is specified, [openSSL](#openssl) must be configured.
If `https_port` is specified, [openSSL](#server_settings-openssl) must be configured.
If `http_port` is specified, the openSSL configuration is ignored even if it is set.
@ -417,7 +417,7 @@ The value 0 means that you can delete all tables without any restrictions.
## merge_tree {#server_settings-merge_tree}
Fine tuning for tables in the [ MergeTree](../../operations/table_engines/mergetree.md).
Fine tuning for tables in the [MergeTree](../../operations/table_engines/mergetree.md).
For more information, see the MergeTreeSettings.h header file.
@ -430,7 +430,7 @@ For more information, see the MergeTreeSettings.h header file.
```
## openSSL
## openSSL {#server_settings-openssl}
SSL client/server configuration.
@ -609,6 +609,19 @@ Port for communicating with clients over the TCP protocol.
<tcp_port>9000</tcp_port>
```
## tcp_port_secure {#server_settings-tcp_port_secure}
Port for communicating with the clients over the secure connection by TCP protocol. Use it with [OpenSSL](#server_settings-openssl) settings.
**Possible values**
Positive integer.
**Default value**
```xml
<tcp_port_secure>9440</tcp_port_secure>
```
## tmp_path

View File

@ -175,6 +175,20 @@ Any positive integer.
**Default value**: 1048576.
## min_bytes_to_use_direct_io {#settings-min_bytes_to_use_direct_io}
The minimum data volume to be read from storage required for using of the direct I/O access to the storage disk.
ClickHouse uses this setting when selecting the data from tables. If summary storage volume of all the data to be read exceeds `min_bytes_to_use_direct_io` bytes, then ClickHouse reads the data from the storage disk with `O_DIRECT` option.
**Possible values**
Positive integer.
0 — The direct I/O is disabled.
**Default value**: 0.
## log_queries
Setting up query logging.

View File

@ -75,6 +75,13 @@ Rollup configuration structure:
```
required-columns
pattern
regexp
function
pattern
regexp
age + precision
...
pattern
regexp
function
@ -88,15 +95,20 @@ default
...
```
When processing a row, ClickHouse checks the rules in the `pattern` section. If the metric name matches the `regexp`, the rules from the `pattern`section are applied; otherwise, the rules from the `default` section are used.
**Important:** The order of patterns should be next:
The rules are defined with fields `function` and `age + precision`.
1. Patterns *without* `function` *or* `retention`.
1. Patterns *with* both `function` *and* `retention`.
1. Pattern `dafault`.
When processing a row, ClickHouse checks the rules in the `pattern` sections. Each of `pattern` (including `default`) sections could contain `function` parameter for aggregation, `retention` parameters or both. If the metric name matches the `regexp`, the rules from the `pattern` section (or sections) are applied; otherwise, the rules from the `default` section are used.
Fields for `pattern` and `default` sections:
- `regexp` A pattern for the metric name.
- `age` The minimum age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds. Should be a divisor for 86400 (seconds in a day).
- `function` The name of the aggregating function to apply to data whose age falls within the range `[age, age + precision]`.
The `required-columns`:

View File

@ -9,38 +9,38 @@ Kafka lets you:
- Process streams as they become available.
Old format:
## Creating a Table {#table_engine-kafka-creating-a-table}
```
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_skip_broken_messages = <0|1>]
```
New format:
```
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_schema = '',
kafka_num_consumers = 2
```
Required parameters:
- `kafka_broker_list` A comma-separated list of brokers (`localhost:9092`).
- `kafka_topic_list` A list of Kafka topics (`my_topic`).
- `kafka_group_name` A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section.
- `kafka_broker_list` A comma-separated list of brokers (for example, `localhost:9092`).
- `kafka_topic_list` A list of Kafka topics.
- `kafka_group_name` A group of Kafka consumers. Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL `FORMAT` function, such as ` JSONEachRow`. For more information, see the [Formats](../../interfaces/formats.md) section.
Optional parameters:
- `kafka_row_delimiter` - Character-delimiter of records (rows), which ends the message.
- `kafka_schema` An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_row_delimiter` Delimiter character, which ends the message.
- `kafka_schema` Parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
- `kafka_skip_broken_messages` Mode of Kafka messages parser. If `kafka_skip_broken_messages = 1` then the engine skips the Kafka messages (message equals a row of data) that can't be parsed.
Examples:
@ -72,6 +72,23 @@ Examples:
kafka_num_consumers = 4;
```
<details markdown="1"><summary>Deprecated Method for Creating a Table</summary>
!!! attention
Do not use this method in new projects and, if possible, switch the old projects to the method described above.
```
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
```
</details>
## Description
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.
Groups are flexible and synced on the cluster. For instance, if you have 10 topics and 5 copies of a table in a cluster, then each copy gets 2 topics. If the number of copies changes, the topics are redistributed across the copies automatically. Read more about this at [http://kafka.apache.org/intro](http://kafka.apache.org/intro).

View File

@ -70,6 +70,8 @@ For a description of request parameters, see [request description](../../query_l
- `SETTINGS` — Additional parameters that control the behavior of the `MergeTree`:
- `index_granularity` — The granularity of an index. The number of data rows between the "marks" of an index. By default, 8192. The list of all available parameters you can see in [MergeTreeSettings.h](https://github.com/yandex/ClickHouse/blob/master/dbms/src/Storages/MergeTree/MergeTreeSettings.h).
- `min_merge_bytes_to_use_direct_io` — The minimum data volume for merge operation required for using of the direct I/O access to the storage disk. During the merging of the data parts, ClickHouse calculates summary storage volume of all the data to be merged. If the volume exceeds `min_merge_bytes_to_use_direct_io` bytes, thеn ClickHouse reads and writes the data using direct I/O interface (`O_DIRECT` option) to the storage disk. If `min_merge_bytes_to_use_direct_io = 0`, then the direct I/O is disabled. Default value: `10 * 1024 * 1024 * 1024` bytes.
**Example of sections setting**

View File

@ -111,8 +111,8 @@ Check:
Check:
- The `tcp_port_secure` setting.
- Settings for SSL sertificates.
- The [tcp_port_secure](server_settings/settings.md#server_settings-tcp_port_secure) setting.
- Settings for [SSL sertificates](server_settings/settings.md#server_settings-openssl).
Use proper parameters while connecting. For example, use the `port_secure` parameter with `clickhouse_client`.

View File

@ -88,7 +88,7 @@ Example of settings:
</source>
```
In order for ClickHouse to access an HTTPS resource, you must [configure openSSL](../../operations/server_settings/settings.md) in the server configuration.
In order for ClickHouse to access an HTTPS resource, you must [configure openSSL](../../operations/server_settings/settings.md#server_settings-openssl) in the server configuration.
Setting fields:

View File

@ -72,6 +72,6 @@ The `remote` table function can be useful in the following cases:
If the user is not specified, `default` is used.
If the password is not specified, an empty password is used.
`remoteSecure` - same as `remote` but with secured connection. Default port - `tcp_port_secure` from config or 9440.
`remoteSecure` - same as `remote` but with secured connection. Default port — [tcp_port_secure](../../operations/server_settings/settings.md#server_settings-tcp_port_secure) from config or 9440.
[Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/remote/) <!--hide-->

View File

@ -14,7 +14,6 @@ ClickHouse может принимать (`INSERT`) и отдавать (`SELECT
[CSVWithNames](#csvwithnames) | ✔ | ✔ |
[Values](#values) | ✔ | ✔ |
[Vertical](#vertical) | ✗ | ✔ |
[VerticalRaw](#verticalraw) | ✗ | ✔ |
[JSON](#json) | ✗ | ✔ |
[JSONCompact](#jsoncompact) | ✗ | ✔ |
[JSONEachRow](#jsoneachrow) | ✔ | ✔ |
@ -354,10 +353,22 @@ SELECT * FROM t_null
└───┴──────┘
```
В форматах `Pretty*` строки выводятся без экранирования. Ниже приведен пример для формата [PrettyCompact](#prettycompact):
``` sql
SELECT 'String with \'quotes\' and \t character' AS Escaping_test
```
```
┌─Escaping_test────────────────────────┐
│ String with 'quotes' and character │
└──────────────────────────────────────┘
```
Для защиты от вываливания слишком большого количества данных в терминал, выводится только первые 10 000 строк. Если строк больше или равно 10 000, то будет написано "Showed first 10 000."
Этот формат подходит только для вывода результата выполнения запроса, но не для парсинга (приёма данных для вставки в таблицу).
Формат Pretty поддерживает вывод тотальных значений (при использовании WITH TOTALS) и экстремальных значений (при настройке extremes выставленной в 1). В этих случаях, после основных данных выводятся тотальные значения, и экстремальные значения, в отдельных табличках. Пример (показан для формата PrettyCompact):
Формат `Pretty` поддерживает вывод тотальных значений (при использовании WITH TOTALS) и экстремальных значений (при настройке extremes выставленной в 1). В этих случаях, после основных данных выводятся тотальные значения, и экстремальные значения, в отдельных табличках. Пример (показан для формата [PrettyCompact](#prettycompact)):
``` sql
SELECT EventDate, count() AS c FROM test.hits GROUP BY EventDate WITH TOTALS ORDER BY EventDate FORMAT PrettyCompact
@ -388,7 +399,7 @@ Extremes:
## PrettyCompact {#prettycompact}
Отличается от `Pretty` тем, что не рисуется сетка между строками - результат более компактный.
Отличается от [Pretty](#pretty) тем, что не рисуется сетка между строками - результат более компактный.
Этот формат используется по умолчанию в клиенте командной строки в интерактивном режиме.
## PrettyCompactMonoBlock {#prettycompactmonoblock}
@ -433,6 +444,7 @@ FixedString представлены просто как последовате
Array представлены как длина в формате varint (unsigned [LEB128](https://en.wikipedia.org/wiki/LEB128)), а затем элементы массива, подряд.
Для поддержки [NULL](../query_language/syntax.md#null-literal) перед каждым значением типа [Nullable](../data_types/nullable.md
## Values
Выводит каждую строку в скобках. Строки разделены запятыми. После последней строки запятой нет. Значения внутри скобок также разделены запятыми. Числа выводятся в десятичном виде без кавычек. Массивы выводятся в квадратных скобках. Строки, даты, даты-с-временем выводятся в кавычках. Правила экранирования и особенности парсинга аналогичны формату [TabSeparated](#tabseparated). При форматировании, лишние пробелы не ставятся, а при парсинге - допустимы и пропускаются (за исключением пробелов внутри значений типа массив, которые недопустимы). [NULL](../query_language/syntax.md) представляется как `NULL`.
@ -459,34 +471,20 @@ x: 1
y: ᴺᵁᴸᴸ
```
Этот формат подходит только для вывода результата выполнения запроса, но не для парсинга (приёма данных для вставки в таблицу).
В формате `Vertical` строки выводятся без экранирования. Например:
## VerticalRaw {#verticalraw}
Отличается от формата `Vertical` тем, что строки выводятся без экранирования.
Этот формат подходит только для вывода результата выполнения запроса, но не для парсинга (приёма данных для вставки в таблицу).
Примеры:
``` sql
SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT Vertical
```
:) SHOW CREATE TABLE geonames FORMAT VerticalRaw;
Row 1:
──────
statement: CREATE TABLE default.geonames ( geonameid UInt32, date Date DEFAULT CAST('2017-12-08' AS Date)) ENGINE = MergeTree(date, geonameid, 8192)
:) SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT VerticalRaw;
```
Row 1:
──────
test: string with 'quotes' and with some special
test: string with 'quotes' and with some special
characters
```
Для сравнения - формат Vertical:
```
:) SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT Vertical;
Row 1:
──────
test: string with \'quotes\' and \t with some special \n characters
```
Этот формат подходит только для вывода результата выполнения запроса, но не для парсинга (приёма данных для вставки в таблицу).
## XML {#xml}

View File

@ -72,12 +72,19 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
## Конфигурация rollup
Настройки для прореживания данных задаются параметром [graphite_rollup](../server_settings/settings.md#server_settings-graphite_rollup) Имя параметра может быть любым. Можно создать несколько конфигураций и использовать их для разных таблиц.
Настройки для прореживания данных задаются параметром [graphite_rollup](../server_settings/settings.md#server_settings-graphite_rollup). Имя параметра может быть любым. Можно создать несколько конфигураций и использовать их для разных таблиц.
Структура конфигурации rollup:
```
required-columns
pattern
regexp
function
pattern
regexp
age + precision
...
pattern
regexp
function
@ -91,15 +98,19 @@ default
...
```
При обработке строки ClickHouse проверяет правила в разделе `pattern`. Если имя метрики соответствует шаблону `regexp`, то применяются правила из раздела `pattern`, в противном случае из раздела `default`.
**Важно**: порядок разделов `pattern` должен быть следующим:
Правила определяются с помощью полей `function` и `age + precision`.
1. Разделы *без* параметра `function` *или* `retention`.
1. Разделы *с* параметрами `function` *и* `retention`.
1. Раздел `default`.
Поля для разделов `pattenrn` и `default`:
При обработке строки ClickHouse проверяет правила в разделах `pattern`. Каждый из разделов `pattern` (включая `default`) может содержать параметр `function` для аггрегации, правила `retention` для прореживания или оба эти параметра. Если имя метрики соответствует шаблону `regexp`, то применяются правила из раздела (или разделов) `pattern`, в противном случае из раздела `default`.
Поля для разделов `pattern` и `default`:
- `regexp` шаблон имени метрики.
- `age` минимальный возраст данных в секундах.
- `precision` точность определения возраста данных в секундах.
- `precision` точность определения возраста данных в секундах. Должен быть делителем для 86400 (количество секунд в дне).
- `function` имя агрегирующей функции, которую следует применить к данным, чей возраст оказался в интервале `[age, age + precision]`.
`required-columns`:
@ -117,6 +128,10 @@ default
<time_column_name>Time</time_column_name>
<value_column_name>Value</value_column_name>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>click_cost</regexp>
<function>any</function>

View File

@ -1,148 +0,0 @@
# GraphiteMergeTree
This engine is designed for rollup (thinning and aggregating/averaging) [Graphite](http://graphite.readthedocs.io/en/latest/index.html) data. It may be helpful to developers who want to use ClickHouse as a data store for Graphite.
You can use any ClickHouse table engine to store the Graphite data if you don't need rollup, but if you need a rollup use `GraphiteMergeTree`. The engine reduces the volume of storage and increases the efficiency of queries from Graphite.
The engine inherits properties from [MergeTree](mergetree.md).
## Creating a Table
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
Path String,
Time DateTime,
Value <Numeric_type>,
Version <Numeric_type>
...
) ENGINE = GraphiteMergeTree(config_section)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
```
For a description of request parameters, see [request description](../../query_language/create.md).
A table for the Graphite date should have the following columns:
- Column with the metric name (Graphite sensor). Data type: `String`.
- Column with the time for measuring the metric. Data type: `DateTime`.
- Column with the value of the metric. Data type: any numeric.
- Column with the version of the metric with the same name and time of measurement. Data type: any numeric.
ClickHouse saves the rows with the highest version or the last written if versions are the same. Other rows are deleted during the merge of data parts.
The names of these columns should be set in the rollup configuration.
**GraphiteMergeTree parameters**
- `config_section` — Name of the section in the configuration file, where are the rules of rollup set.
**Query clauses**
When creating a `GraphiteMergeTree` table, the same [clauses](mergetree.md) are required, as when creating a `MergeTree` table.
<details markdown="1"><summary>Deprecated Method for Creating a Table</summary>
!!! attention
Do not use this method in new projects and, if possible, switch the old projects to the method described above.
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
EventDate Date,
Path String,
Time DateTime,
Value <Numeric_type>,
Version <Numeric_type>
...
) ENGINE [=] GraphiteMergeTree(date-column [, sampling_expression], (primary, key), index_granularity, config_section)
```
All of the parameters excepting `config_section` have the same meaning as in `MergeTree`.
- `config_section` — Name of the section in the configuration file, where are the rules of rollup set.
</details>
## Rollup configuration
The settings for rollup are defined by the [graphite_rollup](../server_settings/settings.md) parameter in the server configuration. The name of the parameter could be any. You can create several configurations and use them for different tables.
Rollup configuration structure:
```
required-columns
pattern
regexp
function
age + precision
...
pattern
...
default
function
age + precision
...
```
When processing a row, ClickHouse checks the rules in the `pattern` section. If the metric name matches the `regexp`, the rules from the `pattern`section are applied; otherwise, the rules from the `default` section are used.
The rules are defined with fields `function` and `age + precision`.
Fields for `pattern` and `default` sections:
- `regexp` A pattern for the metric name.
- `age` The minimum age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds.
- `function` The name of the aggregating function to apply to data whose age falls within the range `[age, age + precision]`.
The `required-columns`:
- `path_column_name` — Column with the metric name (Graphite sensor).
- `time_column_name` — Column with the time for measuring the metric.
- `value_column_name` — Column with the value of the metric at the time set in `time_column_name`.
- `version_column_name` — Column with the version timestamp of the metric with the same name and time remains in the database.
Example of settings:
```xml
<graphite_rollup>
<path_column_name>Path</path_column_name>
<time_column_name>Time</time_column_name>
<value_column_name>Value</value_column_name>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
```
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/graphitemergetree/) <!--hide-->

View File

@ -0,0 +1 @@
../../../en/operations/table_engines/graphitemergetree.md

View File

@ -1,235 +0,0 @@
# MergeTree {#table_engines-mergetree}
The `MergeTree` engine and other engines of this family (`*MergeTree`) are the most robust ClickHousе table engines.
The basic idea for `MergeTree` engines family is the following. When you have tremendous amount of a data that should be inserted into the table, you should write them quickly part by part and then merge parts by some rules in background. This method is much more efficient than constantly rewriting data in the storage at the insert.
Main features:
- Stores data sorted by primary key.
This allows you to create a small sparse index that helps find data faster.
- This allows you to use partitions if the [partitioning key](custom_partitioning_key.md) is specified.
ClickHouse supports certain operations with partitions that are more effective than general operations on the same data with the same result. ClickHouse also automatically cuts off the partition data where the partitioning key is specified in the query. This also increases the query performance.
- Data replication support.
The family of `ReplicatedMergeTree` tables is used for this. For more information, see the [Data replication](replication.md) section.
- Data sampling support.
If necessary, you can set the data sampling method in the table.
!!! info
The [Merge](merge.md) engine does not belong to the `*MergeTree` family.
## Creating a Table
```
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = MergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
```
For a description of request parameters, see [request description](../../query_language/create.md).
**Query clauses**
- `ENGINE` - Name and parameters of the engine. `ENGINE = MergeTree()`. `MergeTree` engine does not have parameters.
- `PARTITION BY` — The [partitioning key](custom_partitioning_key.md).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](../../data_types/date.md). The partition names here have the `"YYYYMM"` format.
- `ORDER BY` — The sorting key.
A tuple of columns or arbitrary expressions. Example: `ORDER BY (CounterID, EventDate)`.
- `PRIMARY KEY` - The primary key if it [differs from the sorting key](mergetree.md).
By default the primary key is the same as the sorting key (which is specified by the `ORDER BY` clause).
Thus in most cases it is unnecessary to specify a separate `PRIMARY KEY` clause.
- `SAMPLE BY` — An expression for sampling.
If a sampling expression is used, the primary key must contain it. Example:
`SAMPLE BY intHash32(UserID) ORDER BY (CounterID, EventDate, intHash32(UserID))`.
- `SETTINGS` — Additional parameters that control the behavior of the `MergeTree`:
- `index_granularity` — The granularity of an index. The number of data rows between the "marks" of an index. By default, 8192.
**Example of sections setting**
```
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity=8192
```
In the example, we set partitioning by month.
We also set an expression for sampling as a hash by the user ID. This allows you to pseudorandomize the data in the table for each `CounterID` and `EventDate`. If, when selecting the data, you define a [SAMPLE](../../query_language/select.md#select-sample-clause) clause, ClickHouse will return an evenly pseudorandom data sample for a subset of users.
`index_granularity` could be omitted because 8192 is the default value.
<details markdown="1"><summary>Deprecated Method for Creating a Table</summary>
!!! attention
Do not use this method in new projects and, if possible, switch the old projects to the method described above.
```
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE [=] MergeTree(date-column [, sampling_expression], (primary, key), index_granularity)
```
**MergeTree() parameters**
- `date-column` — The name of a column of the type [Date](../../data_types/date.md). ClickHouse automatically creates partitions by month on the basis of this column. The partition names are in the `"YYYYMM"` format.
- `sampling_expression` — an expression for sampling.
- `(primary, key)` — primary key. Type — [Tuple()](../../data_types/tuple.md- `index_granularity` — The granularity of an index. The number of data rows between the "marks" of an index. The value 8192 is appropriate for most tasks.
**Example**
```
MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192)
```
The `MergeTree` engine is configured in the same way as in the example above for the main engine configuration method.
</details>
## Data Storage
A table consists of data *parts* sorted by primary key.
When data is inserted in a table, separate data parts are created and each of them is lexicographically sorted by primary key. For example, if the primary key is `(CounterID, Date)`, the data in the part is sorted by `CounterID`, and within each `CounterID`, it is ordered by `Date`.
Data belonging to different partitions are separated into different parts. In the background, ClickHouse merges data parts for more efficient storage. Parts belonging to different partitions are not merged. The merge mechanism does not guarantee that all rows with the same primary key will be in the same data part.
For each data part, ClickHouse creates an index file that contains the primary key value for each index row ("mark"). Index row numbers are defined as `n * index_granularity`. The maximum value `n` is equal to the integer part of dividing the total number of rows by the `index_granularity`. For each column, the "marks" are also written for the same index rows as the primary key. These "marks" allow you to find the data directly in the columns.
You can use a single large table and continually add data to it in small chunks this is what the `MergeTree` engine is intended for.
## Primary Keys and Indexes in Queries
Let's take the `(CounterID, Date)` primary key. In this case, the sorting and index can be illustrated as follows:
```
Whole data: [-------------------------------------------------------------------------]
CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiikllllllll]
Date: [1111111222222233331233211111222222333211111112122222223111112223311122333]
Marks: | | | | | | | | | | |
a,1 a,2 a,3 b,3 e,2 e,3 g,1 h,2 i,1 i,3 l,3
Marks numbers: 0 1 2 3 4 5 6 7 8 9 10
```
If the data query specifies:
- `CounterID in ('a', 'h')`, the server reads the data in the ranges of marks `[0, 3)` and `[6, 8)`.
- `CounterID IN ('a', 'h') AND Date = 3`, the server reads the data in the ranges of marks `[1, 3)` and `[7, 8)`.
- `Date = 3`, the server reads the data in the range of marks `[1, 10]`.
The examples above show that it is always more effective to use an index than a full scan.
A sparse index allows extra strings to be read. When reading a single range of the primary key, up to `index_granularity * 2` extra rows in each data block can be read. In most cases, ClickHouse performance does not degrade when `index_granularity = 8192`.
Sparse indexes allow you to work with a very large number of table rows, because such indexes are always stored in the computer's RAM.
ClickHouse does not require a unique primary key. You can insert multiple rows with the same primary key.
### Selecting the Primary Key
The number of columns in the primary key is not explicitly limited. Depending on the data structure, you can include more or fewer columns in the primary key. This may:
- Improve the performance of an index.
If the primary key is `(a, b)`, then adding another column `c` will improve the performance if the following conditions are met:
- There are queries with a condition on column `c`.
- Long data ranges (several times longer than the `index_granularity`) with identical values for `(a, b)` are common. In other words, when adding another column allows you to skip quite long data ranges.
- Improve data compression.
ClickHouse sorts data by primary key, so the higher the consistency, the better the compression.
- Provide additional logic when data parts merging in the [CollapsingMergeTree](collapsingmergetree.md#table_engine-collapsingmergetree) and [SummingMergeTree](summingmergetree.md) engines.
In this case it makes sense to specify the *sorting key* that is different from the primary key.
A long primary key will negatively affect the insert performance and memory consumption, but extra columns in the primary key do not affect ClickHouse performance during `SELECT` queries.
### Choosing the Primary Key that differs from the Sorting Key
It is possible to specify the primary key (the expression, values of which are written into the index file
for each mark) that is different from the sorting key (the expression for sorting the rows in data parts).
In this case the primary key expression tuple must be a prefix of the sorting key expression tuple.
This feature is helpful when using the [SummingMergeTree](summingmergetree.md) and
[AggregatingMergeTree](aggregatingmergetree.md) table engines. In a common case when using these engines the
table has two types of columns: *dimensions* and *measures*. Typical queries aggregate values of measure
columns with arbitrary `GROUP BY` and filtering by dimensions. As SummingMergeTree and AggregatingMergeTree
aggregate rows with the same value of the sorting key, it is natural to add all dimensions to it. As a result
the key expression consists of a long list of columns and this list must be frequently updated with newly
added dimensions.
In this case it makes sense to leave only a few columns in the primary key that will provide efficient
range scans and add the remaining dimension columns to the sorting key tuple.
[ALTER of the sorting key](../../query_language/alter.md) is a
lightweight operation because when a new column is simultaneously added to the table and to the sorting key
data parts need not be changed (they remain sorted by the new sorting key expression).
### Use of Indexes and Partitions in Queries
For`SELECT` queries, ClickHouse analyzes whether an index can be used. An index can be used if the `WHERE/PREWHERE` clause has an expression (as one of the conjunction elements, or entirely) that represents an equality or inequality comparison operation, or if it has `IN` or `LIKE` with a fixed prefix on columns or expressions that are in the primary key or partitioning key, or on certain partially repetitive functions of these columns, or logical relationships of these expressions.
Thus, it is possible to quickly run queries on one or many ranges of the primary key. In this example, queries will be fast when run for a specific tracking tag; for a specific tag and date range; for a specific tag and date; for multiple tags with a date range, and so on.
Let's look at the engine configured as follows:
```
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity=8192
```
In this case, in queries:
``` sql
SELECT count() FROM table WHERE EventDate = toDate(now()) AND CounterID = 34
SELECT count() FROM table WHERE EventDate = toDate(now()) AND (CounterID = 34 OR CounterID = 42)
SELECT count() FROM table WHERE ((EventDate >= toDate('2014-01-01') AND EventDate <= toDate('2014-01-31')) OR EventDate = toDate('2014-05-01')) AND CounterID IN (101500, 731962, 160656) AND (CounterID = 101500 OR EventDate != toDate('2014-05-01'))
```
ClickHouse will use the primary key index to trim improper data and the monthly partitioning key to trim partitions that are in improper date ranges.
The queries above show that the index is used even for complex expressions. Reading from the table is organized so that using the index can't be slower than a full scan.
In the example below, the index can't be used.
``` sql
SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%'
```
To check whether ClickHouse can use the index when running a query, use the settings [force_index_by_date](../settings/settings.md#settings-force_index_by_date) and [force_primary_key](../settings/settings.md).
The key for partitioning by month allows reading only those data blocks which contain dates from the proper range. In this case, the data block may contain data for many dates (up to an entire month). Within a block, data is sorted by primary key, which might not contain the date as the first column. Because of this, using a query with only a date condition that does not specify the primary key prefix will cause more data to be read than for a single date.
## Concurrent Data Access
For concurrent table access, we use multi-versioning. In other words, when a table is simultaneously read and updated, data is read from a set of parts that is current at the time of the query. There are no lengthy locks. Inserts do not get in the way of read operations.
Reading from a table is automatically parallelized.
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/mergetree/) <!--hide-->

View File

@ -0,0 +1 @@
../../../en/operations/table_engines/mergetree.md