Merge branch 'master' into alter_table_drop_detached_part

This commit is contained in:
alexey-milovidov 2019-08-23 05:22:07 +03:00 committed by GitHub
commit 7012a421c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
106 changed files with 1202 additions and 1892 deletions

View File

@ -219,7 +219,7 @@ endif()
add_library(${ARROW_LIBRARY} ${ARROW_SRCS})
add_dependencies(${ARROW_LIBRARY} protoc)
target_include_directories(${ARROW_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/cpp/src ${Boost_INCLUDE_DIRS})
target_link_libraries(${ARROW_LIBRARY} PRIVATE ${DOUBLE_CONVERSION_LIBRARIES} Threads::Threads)
target_link_libraries(${ARROW_LIBRARY} PRIVATE ${DOUBLE_CONVERSION_LIBRARIES} ${PROTOBUF_LIBRARIES} Threads::Threads)
if (ARROW_WITH_LZ4)
target_link_libraries(${ARROW_LIBRARY} PRIVATE ${LZ4_LIBRARY})
endif()

View File

@ -114,6 +114,7 @@ add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Storages/LiveView)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
add_headers_and_sources(dbms src/Processors)

View File

@ -333,7 +333,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only for 'mysql' table function.") \
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \
\
M(SettingBool, experimental_use_processors, false, "Use processors pipeline.") \
M(SettingBool, experimental_use_processors, true, "Use processors pipeline.") \
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.") \
@ -343,14 +343,14 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value") \
M(SettingBool, allow_drop_detached, false, "Allow ALTER TABLE ... DROP DETACHED PART[ITION] ... queries") \
\
M(SettingBool, allow_experimental_live_view, false, "Enable LIVE VIEW. Not mature enough.") \
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13") \
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
M(SettingSeconds, temporary_live_channel_timeout, DEFAULT_TEMPORARY_LIVE_CHANNEL_TIMEOUT_SEC, "Timeout after which temporary live channel is deleted.") \
M(SettingMilliseconds, alter_channel_wait_ms, DEFAULT_ALTER_LIVE_CHANNEL_WAIT_MS, "The wait time for alter channel request.") \
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -12,7 +12,7 @@
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
#include <Storages/StorageLiveView.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
@ -106,8 +106,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
{
BlockOutputStreamPtr output_ = std::make_shared<LiveViewBlockOutputStream>(*live_view);
StorageLiveView::writeIntoLiveView(*live_view, block, context, output_);
StorageLiveView::writeIntoLiveView(*live_view, block, context);
}
else
{

View File

@ -5,7 +5,6 @@
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageLiveView.h>
namespace DB
{

View File

@ -0,0 +1,259 @@
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/castColumn.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
// Implements function, giving value for column within range of given
// Example:
// | c1 |
// | 10 |
// | 20 |
// SELECT c1, neighbor(c1, 1) as c2:
// | c1 | c2 |
// | 10 | 20 |
// | 20 | 0 |
class FunctionNeighbor : public IFunction
{
public:
static constexpr auto name = "neighbor";
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionNeighbor>(context); }
FunctionNeighbor(const Context & context_) : context(context_) {}
/// Get the name of the function.
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
size_t number_of_arguments = arguments.size();
if (number_of_arguments < 2 || number_of_arguments > 3)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(number_of_arguments)
+ ", should be from 2 to 3",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// second argument must be an integer
if (!isInteger(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + " - should be an integer",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
else if (arguments[1]->isNullable())
throw Exception(
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + " - can not be Nullable",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
// check that default value column has supertype with first argument
if (number_of_arguments == 3)
{
DataTypes types = {arguments[0], arguments[2]};
try
{
return getLeastSupertype(types);
}
catch (const Exception &)
{
throw Exception(
"Illegal types of arguments (" + types[0]->getName() + ", " + types[1]->getName()
+ ")"
" of function "
+ getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
return arguments[0];
}
static void insertDefaults(const MutableColumnPtr & target, size_t row_count, ColumnPtr & default_values_column, size_t offset)
{
if (row_count == 0)
{
return;
}
if (default_values_column)
{
if (isColumnConst(*default_values_column))
{
const IColumn & constant_content = assert_cast<const ColumnConst &>(*default_values_column).getDataColumn();
for (size_t row = 0; row < row_count; ++row)
{
target->insertFrom(constant_content, 0);
}
}
else
{
target->insertRangeFrom(*default_values_column, offset, row_count);
}
}
else
{
for (size_t row = 0; row < row_count; ++row)
{
target->insertDefault();
}
}
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const ColumnWithTypeAndName & source_column_name_and_type = block.getByPosition(arguments[0]);
const DataTypePtr & result_type = block.getByPosition(result).type;
ColumnPtr source_column = source_column_name_and_type.column;
// adjust source and default values columns to resulting data type
if (!source_column_name_and_type.type->equals(*result_type))
{
source_column = castColumn(source_column_name_and_type, result_type, context);
}
ColumnPtr default_values_column;
/// Has argument with default value: neighbor(source, offset, default)
if (arguments.size() == 3)
{
default_values_column = block.getByPosition(arguments[2]).column;
if (!block.getByPosition(arguments[2]).type->equals(*result_type))
default_values_column = castColumn(block.getByPosition(arguments[2]), result_type, context);
}
const auto & offset_structure = block.getByPosition(arguments[1]);
ColumnPtr offset_column = offset_structure.column;
auto is_constant_offset = isColumnConst(*offset_structure.column);
// since we are working with both signed and unsigned - we'll try to use Int64 for handling all of them
const DataTypePtr desired_type = std::make_shared<DataTypeInt64>();
if (!block.getByPosition(arguments[1]).type->equals(*desired_type))
{
offset_column = castColumn(offset_structure, desired_type, context);
}
if (isColumnConst(*source_column))
{
/// NOTE Inconsistency when default_values are specified.
auto column = result_type->createColumnConst(input_rows_count, (*source_column)[0]);
block.getByPosition(result).column = std::move(column);
}
else
{
auto column = result_type->createColumn();
column->reserve(input_rows_count);
// with constant offset - insertRangeFrom
if (is_constant_offset)
{
Int64 offset_value = offset_column->getInt(0);
auto offset_value_casted = static_cast<size_t>(std::abs(offset_value));
size_t default_value_count = std::min(offset_value_casted, input_rows_count);
if (offset_value > 0)
{
// insert shifted value
if (offset_value_casted <= input_rows_count)
{
column->insertRangeFrom(*source_column, offset_value_casted, input_rows_count - offset_value_casted);
}
insertDefaults(column, default_value_count, default_values_column, input_rows_count - default_value_count);
}
else if (offset_value < 0)
{
// insert defaults up to offset_value
insertDefaults(column, default_value_count, default_values_column, 0);
column->insertRangeFrom(*source_column, 0, input_rows_count - default_value_count);
}
else
{
// populate column with source values, when offset is equal to zero
column->insertRangeFrom(*source_column, 0, input_rows_count);
}
}
else
{
// with dynamic offset - handle row by row
for (size_t row = 0; row < input_rows_count; ++row)
{
Int64 offset_value = offset_column->getInt(row);
if (offset_value == 0)
{
column->insertFrom(*source_column, row);
}
else if (offset_value > 0)
{
size_t real_offset = row + offset_value;
if (real_offset > input_rows_count)
{
if (default_values_column)
{
column->insertFrom(*default_values_column, row);
}
else
{
column->insertDefault();
}
}
else
{
column->insertFrom(*source_column, real_offset);
}
}
else
{
// out of range
auto offset_value_casted = static_cast<size_t>(std::abs(offset_value));
if (offset_value_casted > row)
{
if (default_values_column)
{
column->insertFrom(*default_values_column, row);
}
else
{
column->insertDefault();
}
}
else
{
column->insertFrom(*source_column, row - offset_value_casted);
}
}
}
}
block.getByPosition(result).column = std::move(column);
}
}
private:
const Context & context;
};
void registerFunctionNeighbor(FunctionFactory & factory)
{
factory.registerFunction<FunctionNeighbor>();
}
}

View File

@ -18,6 +18,7 @@ void registerFunctionBlockSize(FunctionFactory &);
void registerFunctionBlockNumber(FunctionFactory &);
void registerFunctionRowNumberInBlock(FunctionFactory &);
void registerFunctionRowNumberInAllBlocks(FunctionFactory &);
void registerFunctionNeighbor(FunctionFactory &);
void registerFunctionSleep(FunctionFactory &);
void registerFunctionSleepEachRow(FunctionFactory &);
void registerFunctionMaterialize(FunctionFactory &);
@ -69,6 +70,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionBlockNumber(factory);
registerFunctionRowNumberInBlock(factory);
registerFunctionRowNumberInAllBlocks(factory);
registerFunctionNeighbor(factory);
registerFunctionSleep(factory);
registerFunctionSleepEachRow(factory);
registerFunctionMaterialize(factory);

View File

@ -8,9 +8,9 @@
#include <Storages/AlterCommands.h>
#include <Storages/MutationCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/LiveViewCommands.h>
#include <Storages/LiveView/LiveViewCommands.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageLiveView.h>
#include <algorithm>

View File

@ -1550,7 +1550,11 @@ void InterpreterSelectQuery::executeFetchColumns(
{
/// Unify streams in case they have different headers.
auto first_header = streams.at(0)->getHeader();
for (size_t i = 1; i < streams.size(); ++i)
if (first_header.columns() > 1 && first_header.has("_dummy"))
first_header.erase("_dummy");
for (size_t i = 0; i < streams.size(); ++i)
{
auto & stream = streams[i];
auto header = stream->getHeader();

View File

@ -25,6 +25,7 @@ namespace ErrorCodes
extern const int UNKNOWN_STORAGE;
extern const int UNKNOWN_TABLE;
extern const int TOO_MANY_COLUMNS;
extern const int SUPPORT_IS_DISABLED;
}
BlockInputStreamPtr InterpreterWatchQuery::executeImpl()
@ -34,6 +35,9 @@ BlockInputStreamPtr InterpreterWatchQuery::executeImpl()
BlockIO InterpreterWatchQuery::execute()
{
if (!context.getSettingsRef().allow_experimental_live_view)
throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')", ErrorCodes::SUPPORT_IS_DISABLED);
BlockIO res;
const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr);
String database;

View File

@ -590,32 +590,45 @@ void PipelineExecutor::executeImpl(size_t num_threads)
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
auto thread_group = CurrentThread::getGroup();
using ThreadsData = std::vector<ThreadFromGlobalPool>;
ThreadsData threads;
threads.reserve(num_threads);
bool finished_flag = false;
SCOPE_EXIT(
if (!finished_flag)
{
finish();
for (auto & thread : threads)
thread.join();
}
);
addChildlessProcessorsToStack(stack);
while (!stack.empty())
{
UInt64 proc = stack.top();
stack.pop();
std::lock_guard lock(task_queue_mutex);
if (prepareProcessor(proc, stack, stack, 0, false))
while (!stack.empty())
{
auto cur_state = graph[proc].execution_state.get();
task_queue.push(cur_state);
UInt64 proc = stack.top();
stack.pop();
if (prepareProcessor(proc, stack, stack, 0, false))
{
auto cur_state = graph[proc].execution_state.get();
task_queue.push(cur_state);
}
}
}
ThreadPool pool(num_threads);
SCOPE_EXIT(
finish();
pool.wait()
);
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i)
{
pool.schedule([this, thread_group, thread_num = i, num_threads]
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
{
/// ThreadStatus thread_status;
@ -631,7 +644,10 @@ void PipelineExecutor::executeImpl(size_t num_threads)
});
}
pool.wait();
for (auto & thread : threads)
thread.join();
finished_flag = true;
}
String PipelineExecutor::dumpPipeline() const

View File

@ -5,7 +5,7 @@ namespace DB
{
CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_)
: IInflatingTransform(std::move(header), params_->getHeader())
: IAccumulatingTransform(std::move(header), params_->getHeader())
, params(std::move(params_))
, keys(params->params.keys)
{
@ -13,28 +13,45 @@ CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_
throw Exception("Too many keys are used for CubeTransform.", ErrorCodes::LOGICAL_ERROR);
}
void CubeTransform::consume(Chunk chunk)
Chunk CubeTransform::merge(Chunks && chunks, bool final)
{
consumed_chunk = std::move(chunk);
auto num_rows = consumed_chunk.getNumRows();
mask = (UInt64(1) << keys.size()) - 1;
BlocksList rollup_blocks;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
current_columns = consumed_chunk.getColumns();
current_zero_columns.clear();
current_zero_columns.reserve(keys.size());
for (auto key : keys)
current_zero_columns.emplace_back(current_columns[key]->cloneEmpty()->cloneResized(num_rows));
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
auto num_rows = rollup_block.rows();
return Chunk(rollup_block.getColumns(), num_rows);
}
bool CubeTransform::canGenerate()
void CubeTransform::consume(Chunk chunk)
{
return consumed_chunk;
consumed_chunks.emplace_back(std::move(chunk));
}
Chunk CubeTransform::generate()
{
auto gen_chunk = std::move(consumed_chunk);
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
cube_chunk = merge(std::move(consumed_chunks), false);
else
cube_chunk = std::move(consumed_chunks.front());
consumed_chunks.clear();
auto num_rows = cube_chunk.getNumRows();
mask = (UInt64(1) << keys.size()) - 1;
current_columns = cube_chunk.getColumns();
current_zero_columns.clear();
current_zero_columns.reserve(keys.size());
for (auto key : keys)
current_zero_columns.emplace_back(current_columns[key]->cloneEmpty()->cloneResized(num_rows));
}
auto gen_chunk = std::move(cube_chunk);
if (mask)
{
@ -47,11 +64,9 @@ Chunk CubeTransform::generate()
if ((mask & (UInt64(1) << (size - i - 1))) == 0)
columns[keys[i]] = current_zero_columns[i];
BlocksList cube_blocks = { getInputPort().getHeader().cloneWithColumns(columns) };
auto cube_block = params->aggregator.mergeBlocks(cube_blocks, false);
auto num_rows = cube_block.rows();
consumed_chunk = Chunk(cube_block.getColumns(), num_rows);
Chunks chunks;
chunks.emplace_back(std::move(columns), current_columns.front()->size());
cube_chunk = merge(std::move(chunks), false);
}
finalizeChunk(gen_chunk);

View File

@ -8,7 +8,7 @@ namespace DB
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates all subsets of columns and aggregates over them.
class CubeTransform : public IInflatingTransform
class CubeTransform : public IAccumulatingTransform
{
public:
CubeTransform(Block header, AggregatingTransformParamsPtr params);
@ -16,20 +16,20 @@ public:
protected:
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
ColumnNumbers keys;
Chunk consumed_chunk;
Chunks consumed_chunks;
Chunk cube_chunk;
Columns current_columns;
Columns current_zero_columns;
UInt64 mask = 0;
Chunk merge(Chunks && chunks, bool final);
};
}

View File

@ -12,7 +12,7 @@ static Block transformHeader(Block header, const ExpressionActionsPtr & expressi
ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_, bool on_totals_, bool default_totals_)
: ISimpleTransform(header_, transformHeader(header_, expression), on_totals_)
: ISimpleTransform(header_, transformHeader(header_, expression_), on_totals_)
, expression(std::move(expression_))
, on_totals(on_totals_)
, default_totals(default_totals_)

View File

@ -334,7 +334,7 @@ void MergingAggregatedBucketTransform::transform(Chunk & chunk)
SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs_, AggregatingTransformParamsPtr params_)
: IProcessor(InputPorts(num_inputs_, params->getHeader()), {params_->getHeader()})
: IProcessor(InputPorts(num_inputs_, params_->getHeader()), {params_->getHeader()})
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)

View File

@ -1,6 +1,7 @@
#include <Processors/Transforms/MergingSortedTransform.h>
#include <DataStreams/ColumnGathererStream.h>
#include <IO/WriteBuffer.h>
#include <DataStreams/materializeBlock.h>
namespace DB
{
@ -13,7 +14,7 @@ MergingSortedTransform::MergingSortedTransform(
UInt64 limit_,
bool quiet_,
bool have_all_inputs_)
: IProcessor(InputPorts(num_inputs, header), {header})
: IProcessor(InputPorts(num_inputs, header), {materializeBlock(header)})
, description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, have_all_inputs(have_all_inputs_)
, merged_data(header), source_chunks(num_inputs), cursors(num_inputs)

View File

@ -93,7 +93,7 @@ protected:
columns = chunk.mutateColumns();
if (limit_rows && num_rows > limit_rows)
for (auto & column : columns)
column = (*column->cut(0, limit_rows)).mutate();
column = (*column->cut(0, limit_rows)->convertToFullColumnIfConst()).mutate();
total_merged_rows += num_rows;
merged_rows = num_rows;
@ -165,6 +165,13 @@ private:
void updateCursor(Chunk chunk, size_t source_num)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
auto & shared_chunk_ptr = source_chunks[source_num];
if (!shared_chunk_ptr)

View File

@ -5,7 +5,7 @@ namespace DB
{
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_)
: IInflatingTransform(std::move(header), params_->getHeader())
: IAccumulatingTransform(std::move(header), params_->getHeader())
, params(std::move(params_))
, keys(params->params.keys)
{
@ -13,18 +13,34 @@ RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr par
void RollupTransform::consume(Chunk chunk)
{
consumed_chunk = std::move(chunk);
last_removed_key = keys.size();
consumed_chunks.emplace_back(std::move(chunk));
}
bool RollupTransform::canGenerate()
Chunk RollupTransform::merge(Chunks && chunks, bool final)
{
return consumed_chunk;
BlocksList rollup_blocks;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
auto num_rows = rollup_block.rows();
return Chunk(rollup_block.getColumns(), num_rows);
}
Chunk RollupTransform::generate()
{
auto gen_chunk = std::move(consumed_chunk);
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
rollup_chunk = merge(std::move(consumed_chunks), false);
else
rollup_chunk = std::move(consumed_chunks.front());
consumed_chunks.clear();
last_removed_key = keys.size();
}
auto gen_chunk = std::move(rollup_chunk);
if (last_removed_key)
{
@ -35,11 +51,9 @@ Chunk RollupTransform::generate()
auto columns = gen_chunk.getColumns();
columns[key] = columns[key]->cloneEmpty()->cloneResized(num_rows);
BlocksList rollup_blocks = { getInputPort().getHeader().cloneWithColumns(columns) };
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, false);
num_rows = rollup_block.rows();
consumed_chunk = Chunk(rollup_block.getColumns(), num_rows);
Chunks chunks;
chunks.emplace_back(std::move(columns), num_rows);
rollup_chunk = merge(std::move(chunks), false);
}
finalizeChunk(gen_chunk);

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/IInflatingTransform.h>
#include <Processors/IAccumulatingTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
@ -7,7 +7,7 @@ namespace DB
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates subtotals and grand totals values for a set of columns.
class RollupTransform : public IInflatingTransform
class RollupTransform : public IAccumulatingTransform
{
public:
RollupTransform(Block header, AggregatingTransformParamsPtr params);
@ -15,14 +15,16 @@ public:
protected:
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
ColumnNumbers keys;
Chunk consumed_chunk;
Chunks consumed_chunks;
Chunk rollup_chunk;
size_t last_removed_key = 0;
Chunk merge(Chunks && chunks, bool final);
};
}

View File

@ -161,6 +161,8 @@ void TotalsHavingTransform::transform(Chunk & chunk)
if (const_filter_description.always_true)
{
addToTotals(chunk, nullptr);
auto num_rows = columns.front()->size();
chunk.setColumns(std::move(columns), num_rows);
return;
}

View File

@ -1,23 +1,6 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
namespace DB
@ -61,8 +44,8 @@ public:
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(storage->mutex);
storage->condition.broadcast();
std::lock_guard lock(storage->mutex);
storage->condition.notify_all();
}
Block getHeader() const override { return storage->getHeader(); }
@ -92,14 +75,14 @@ public:
NonBlockingResult tryRead()
{
return tryRead_(false);
return tryReadImpl(false);
}
protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
return tryReadImpl(true).first;
}
/** tryRead method attempts to read a block in either blocking
@ -107,7 +90,7 @@ protected:
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
NonBlockingResult tryReadImpl(bool blocking)
{
Block res;
@ -118,7 +101,7 @@ protected:
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::lock_guard lock(storage->mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
@ -135,7 +118,7 @@ protected:
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::unique_lock lock(storage->mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
@ -162,7 +145,10 @@ protected:
while (true)
{
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec)) / 1000);
/// Or spurious wakeup.
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
if (isCancelled() || storage->is_dropped)
{
@ -181,7 +167,7 @@ protected:
}
}
}
return tryRead_(blocking);
return tryReadImpl(blocking);
}
res = *it;

View File

@ -0,0 +1,74 @@
#pragma once
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
class LiveViewBlockOutputStream : public IBlockOutputStream
{
public:
explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
void writePrefix() override
{
new_blocks = std::make_shared<Blocks>();
new_blocks_metadata = std::make_shared<BlocksMetadata>();
new_hash = std::make_shared<SipHash>();
}
void writeSuffix() override
{
UInt128 key;
String key_str;
new_hash->get128(key.low, key.high);
key_str = key.toHexString();
std::lock_guard lock(storage.mutex);
if (storage.getBlocksHashKey() != key_str)
{
new_blocks_metadata->hash = key_str;
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
for (auto & block : *new_blocks)
{
block.insert({DataTypeUInt64().createColumnConst(
block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"_version"});
}
(*storage.blocks_ptr) = new_blocks;
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
storage.condition.notify_all();
}
new_blocks.reset();
new_blocks_metadata.reset();
new_hash.reset();
}
void write(const Block & block) override
{
new_blocks->push_back(block);
block.updateHash(*new_hash);
}
Block getHeader() const override { return storage.getHeader(); }
private:
using SipHashPtr = std::shared_ptr<SipHash>;
BlocksPtr new_blocks;
BlocksMetadataPtr new_blocks_metadata;
SipHashPtr new_hash;
StorageLiveView & storage;
};
}

View File

@ -12,9 +12,9 @@ limitations under the License. */
#pragma once
#include <Parsers/ASTAlterQuery.h>
#include <optional>
#include <Storages/StorageLiveView.h>
#include <Parsers/ASTAlterQuery.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{

View File

@ -9,11 +9,9 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
@ -21,7 +19,7 @@ limitations under the License. */
#include <Columns/ColumnsNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
@ -66,8 +64,8 @@ public:
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(storage->mutex);
storage->condition.broadcast();
std::lock_guard lock(storage->mutex);
storage->condition.notify_all();
}
Block getHeader() const override
@ -103,7 +101,7 @@ public:
NonBlockingResult tryRead()
{
return tryRead_(false);
return tryReadImpl(false);
}
Block getEventBlock()
@ -120,7 +118,7 @@ protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
return tryReadImpl(true).first;
}
/** tryRead method attempts to read a block in either blocking
@ -128,7 +126,7 @@ protected:
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
NonBlockingResult tryReadImpl(bool blocking)
{
if (has_limit && num_updates == static_cast<Int64>(limit))
{
@ -137,7 +135,7 @@ protected:
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::lock_guard lock(storage->mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
@ -155,7 +153,7 @@ protected:
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
std::unique_lock lock(storage->mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
@ -183,7 +181,10 @@ protected:
while (true)
{
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec)) / 1000);
/// Or spurious wakeup.
bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
if (isCancelled() || storage->is_dropped)
{
@ -202,7 +203,7 @@ protected:
}
}
}
return tryRead_(blocking);
return tryReadImpl(blocking);
}
// move right to the end

View File

@ -9,6 +9,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTWatchQuery.h>
@ -17,15 +18,23 @@ limitations under the License. */
#include <Parsers/ASTLiteral.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/LiveViewBlockInputStream.h>
#include <DataStreams/LiveViewEventsBlockInputStream.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/BlocksBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/LiveView/LiveViewBlockInputStream.h>
#include <Storages/LiveView/LiveViewBlockOutputStream.h>
#include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
#include <Storages/LiveView/ProxyStorage.h>
#include <Storages/StorageLiveView.h>
#include <Storages/StorageFactory.h>
#include <Storages/ProxyStorage.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
@ -40,6 +49,7 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
extern const int SUPPORT_IS_DISABLED;
}
static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
@ -98,6 +108,110 @@ static void checkAllowedQueries(const ASTSelectQuery & query)
}
}
void StorageLiveView::writeIntoLiveView(
StorageLiveView & live_view,
const Block & block,
const Context & context)
{
BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view);
/// Check if live view has any readers if not
/// just reset blocks to empty and do nothing else
/// When first reader comes the blocks will be read.
{
std::lock_guard lock(live_view.mutex);
if (!live_view.hasActiveUsers())
{
live_view.reset();
return;
}
}
bool is_block_processed = false;
BlockInputStreams from;
BlocksPtrs mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
{
std::lock_guard lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
{
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read())
base_mergeable_blocks->push_back(this_block);
mergeable_blocks->push_back(base_mergeable_blocks);
live_view.setMergeableBlocks(mergeable_blocks);
/// Create from streams
for (auto & blocks_ : *mergeable_blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
from.push_back(std::move(stream));
}
is_block_processed = true;
}
}
if (!is_block_processed)
{
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(live_view.getInnerQuery(),
context, proxy_storage,
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().in);
while (Block this_block = data_mergeable_stream->read())
new_mergeable_blocks->push_back(this_block);
if (new_mergeable_blocks->empty())
return;
{
std::lock_guard lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->push_back(new_mergeable_blocks);
/// Create from streams
for (auto & blocks_ : *mergeable_blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
from.push_back(std::move(stream));
}
}
}
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
data = std::make_shared<SquashingBlockInputStream>(
data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes);
copyData(*data, *output);
}
StorageLiveView::StorageLiveView(
const String & table_name_,
const String & database_name_,
@ -259,11 +373,10 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout)
{
while (1)
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
if (!noUsersThreadWakeUp && !noUsersThreadCondition.tryWait(noUsersThreadMutex,
timeout * 1000))
std::unique_lock lock(no_users_thread_mutex);
if (!no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return no_users_thread_wakeup; }))
{
noUsersThreadWakeUp = false;
no_users_thread_wakeup = false;
if (shutdown_called)
return;
if (hasUsers())
@ -301,7 +414,7 @@ void StorageLiveView::noUsersThread(const UInt64 & timeout)
void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
{
bool expected = false;
if (!startnousersthread_called.compare_exchange_strong(expected, true))
if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
return;
if (is_dropped)
@ -312,20 +425,20 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
if (no_users_thread.joinable())
{
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
no_users_thread.join();
}
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = false;
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = false;
}
if (!is_dropped)
no_users_thread = std::thread(&StorageLiveView::noUsersThread, this, timeout);
}
startnousersthread_called = false;
start_no_users_thread_called = false;
}
void StorageLiveView::startup()
@ -341,13 +454,13 @@ void StorageLiveView::shutdown()
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
/// Must detach the no users thread
/// as we can't join it as it will result
/// in a deadlock
no_users_thread.detach();
no_users_thread.detach(); /// TODO Not viable at all.
}
}
@ -361,18 +474,19 @@ void StorageLiveView::drop()
global_context.removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
is_dropped = true;
condition.broadcast();
condition.notify_all();
}
void StorageLiveView::refresh(const Context & context)
{
auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
@ -387,11 +501,11 @@ BlockInputStreams StorageLiveView::read(
/// add user to the blocks_ptr
std::shared_ptr<BlocksPtr> stream_blocks_ptr = blocks_ptr;
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
return { std::make_shared<BlocksBlockInputStream>(stream_blocks_ptr, getHeader()) };
@ -423,17 +537,17 @@ BlockInputStreams StorageLiveView::watch(
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
@ -448,17 +562,17 @@ BlockInputStreams StorageLiveView::watch(
if (no_users_thread.joinable())
{
Poco::FastMutex::ScopedLock lock(noUsersThreadMutex);
noUsersThreadWakeUp = true;
noUsersThreadCondition.signal();
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
{
Poco::FastMutex::ScopedLock lock(mutex);
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
if (getNewBlocks())
condition.broadcast();
condition.notify_all();
}
}
@ -472,6 +586,9 @@ void registerStorageLiveView(StorageFactory & factory)
{
factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)
{
if (!args.local_context.getSettingsRef().allow_experimental_live_view)
throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')", ErrorCodes::SUPPORT_IS_DISABLED);
return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns);
});
}

View File

@ -0,0 +1,184 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <mutex>
#include <condition_variable>
namespace DB
{
struct BlocksMetadata
{
String hash;
UInt64 version;
};
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageLiveView>;
friend class LiveViewBlockInputStream;
friend class LiveViewEventsBlockInputStream;
friend class LiveViewBlockOutputStream;
public:
~StorageLiveView() override;
String getName() const override { return "LiveView"; }
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
String getSelectDatabaseName() const { return select_database_name; }
String getSelectTableName() const { return select_table_name; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
// const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
ASTPtr getInnerQuery() const { return inner_query->clone(); }
/// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
bool isTemporary() { return is_temporary; }
/// Check if we have any readers
/// must be called with mutex locked
bool hasUsers()
{
return blocks_ptr.use_count() > 1;
}
/// Check we have any active readers
/// must be called with mutex locked
bool hasActiveUsers()
{
return active_ptr.use_count() > 1;
}
/// Background thread for temporary tables
/// which drops this table if there are no users
void startNoUsersThread(const UInt64 & timeout);
std::mutex no_users_thread_mutex;
bool no_users_thread_wakeup{false};
std::condition_variable no_users_thread_condition;
/// Get blocks hash
/// must be called with mutex locked
String getBlocksHashKey()
{
if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->hash;
return {};
}
/// Get blocks version
/// must be called with mutex locked
UInt64 getBlocksVersion()
{
if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->version;
return 0;
}
/// Reset blocks
/// must be called with mutex locked
void reset()
{
(*blocks_ptr).reset();
if (*blocks_metadata_ptr)
(*blocks_metadata_ptr)->hash.clear();
mergeable_blocks.reset();
}
void checkTableCanBeDropped() const override;
void drop() override;
void startup() override;
void shutdown() override;
void refresh(const Context & context);
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockInputStreams watch(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
/// Read new data blocks that store query result
bool getNewBlocks();
Block getHeader() const;
static void writeIntoLiveView(
StorageLiveView & live_view,
const Block & block,
const Context & context);
private:
String select_database_name;
String select_table_name;
String table_name;
String database_name;
ASTPtr inner_query;
Context & global_context;
bool is_temporary {false};
mutable Block sample_block;
/// Mutex for the blocks and ready condition
std::mutex mutex;
/// New blocks ready condition to broadcast to readers
/// that new blocks are available
std::condition_variable condition;
/// Active users
std::shared_ptr<bool> active_ptr;
/// Current data blocks that store query result
std::shared_ptr<BlocksPtr> blocks_ptr;
/// Current data blocks metadata
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
BlocksPtrs mergeable_blocks;
void noUsersThread(const UInt64 & timeout);
std::thread no_users_thread;
std::atomic<bool> shutdown_called{false};
std::atomic<bool> start_no_users_thread_called{false};
UInt64 temporary_live_view_timeout;
StorageLiveView(
const String & table_name_,
const String & database_name_,
Context & local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns
);
};
}

View File

@ -157,7 +157,14 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation()
{
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
size_t total_threads_in_pool = pool.getNumberOfThreads();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
/// Allow mutations only if there are enough threads, leave free threads for merges else
if (total_threads_in_pool - busy_threads_in_pool >= data.settings.number_of_free_entries_in_pool_to_execute_mutation)
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
return 0;
}

View File

@ -30,8 +30,10 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
/** Merge settings. */ \
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).") \
M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.") \
M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \
M(SettingUInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \
M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.") \
M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"") \
M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.") \
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \
\

View File

@ -956,15 +956,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
/** Execute merge only if there are enough free threads in background pool to do merges of that size.
* But if all threads are free (maximal size of merge is allowed) then execute any merge,
* (because it may be ordered by OPTIMIZE or early with differrent settings).
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge()
: merger_mutator.getMaxSourcePartSizeForMutation();
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),
* then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,
* because it may be ordered by OPTIMIZE or early with different settings.
* Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
*/
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool
&& sum_parts_size_in_bytes > max_source_parts_size)
bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data.settings.max_bytes_to_merge_at_max_space_in_pool);
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
String reason = "Not executing log entry for part " + entry.new_part_name
String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name
+ " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes)
+ ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ").";
LOG_DEBUG(log, reason);
@ -1154,17 +1158,21 @@ bool ReplicatedMergeTreeQueue::processEntry(
}
size_t ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
std::pair<size_t, size_t> ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
{
std::lock_guard lock(state_mutex);
size_t count = 0;
size_t count_merges = 0;
size_t count_mutations = 0;
for (const auto & entry : queue)
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS
|| entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
++count;
{
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS)
++count_merges;
else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
++count_mutations;
}
return count;
return std::make_pair(count_merges, count_mutations);
}

View File

@ -296,7 +296,7 @@ public:
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Count the number of merges and mutations of single parts in the queue.
size_t countMergesAndPartMutations() const;
std::pair<size_t, size_t> countMergesAndPartMutations() const;
/// Count the total number of active mutations.
size_t countMutations() const;

View File

@ -68,14 +68,6 @@ StoragePtr StorageFactory::get(
name = "LiveView";
}
else if (query.is_live_channel)
{
if (query.storage)
throw Exception("Specifying ENGINE is not allowed for a LiveChannel", ErrorCodes::INCORRECT_QUERY);
name = "LiveChannel";
}
else
{
/// Check for some special types, that are not allowed to be stored in tables. Example: NULL data type.
@ -137,12 +129,6 @@ StoragePtr StorageFactory::get(
"Direct creation of tables with ENGINE LiveView is not supported, use CREATE LIVE VIEW statement",
ErrorCodes::INCORRECT_QUERY);
}
else if (name == "LiveChannel")
{
throw Exception(
"Direct creation of tables with ENGINE LiveChannel is not supported, use CREATE LIVE CHANNEL statement",
ErrorCodes::INCORRECT_QUERY);
}
}
}

View File

@ -1,347 +0,0 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Poco/Condition.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/BlocksBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypesNumber.h>
#include <ext/shared_ptr_helper.h>
#include <Common/SipHash.h>
#include <Storages/IStorage.h>
#include <Storages/ProxyStorage.h>
namespace DB
{
class IAST;
struct BlocksMetadata
{
String hash;
UInt64 version;
};
using ASTPtr = std::shared_ptr<IAST>;
using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
using SipHashPtr = std::shared_ptr<SipHash>;
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageLiveView>;
friend class LiveViewBlockOutputStream;
public:
~StorageLiveView() override;
String getName() const override { return "LiveView"; }
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
String getSelectDatabaseName() const { return select_database_name; }
String getSelectTableName() const { return select_table_name; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
// const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
ASTPtr getInnerQuery() const { return inner_query->clone(); }
/// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
/// Mutex for the blocks and ready condition
Poco::FastMutex mutex;
/// New blocks ready condition to broadcast to readers
/// that new blocks are available
Poco::Condition condition;
bool isTemporary() { return is_temporary; }
/// Check if we have any readers
/// must be called with mutex locked
bool hasUsers()
{
return blocks_ptr.use_count() > 1;
}
/// Check we have any active readers
/// must be called with mutex locked
bool hasActiveUsers()
{
return active_ptr.use_count() > 1;
}
/// Background thread for temporary tables
/// which drops this table if there are no users
void startNoUsersThread(const UInt64 & timeout);
Poco::FastMutex noUsersThreadMutex;
bool noUsersThreadWakeUp{false};
Poco::Condition noUsersThreadCondition;
/// Get blocks hash
/// must be called with mutex locked
String getBlocksHashKey()
{
if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->hash;
return "";
}
/// Get blocks version
/// must be called with mutex locked
UInt64 getBlocksVersion()
{
if (*blocks_metadata_ptr)
return (*blocks_metadata_ptr)->version;
return 0;
}
/// Reset blocks
/// must be called with mutex locked
void reset()
{
(*blocks_ptr).reset();
if (*blocks_metadata_ptr)
(*blocks_metadata_ptr)->hash.clear();
mergeable_blocks.reset();
}
void checkTableCanBeDropped() const override;
void drop() override;
void startup() override;
void shutdown() override;
void refresh(const Context & context);
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockInputStreams watch(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
/// Read new data blocks that store query result
bool getNewBlocks();
Block getHeader() const;
static void writeIntoLiveView(StorageLiveView & live_view,
const Block & block,
const Context & context,
BlockOutputStreamPtr & output)
{
/// Check if live view has any readers if not
/// just reset blocks to empty and do nothing else
/// When first reader comes the blocks will be read.
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
if (!live_view.hasActiveUsers())
{
live_view.reset();
return;
}
}
bool is_block_processed = false;
BlockInputStreams from;
BlocksPtrs mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
{
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read())
base_mergeable_blocks->push_back(this_block);
mergeable_blocks->push_back(base_mergeable_blocks);
live_view.setMergeableBlocks(mergeable_blocks);
/// Create from streams
for (auto & blocks_ : *mergeable_blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
from.push_back(std::move(stream));
}
is_block_processed = true;
}
}
if (!is_block_processed)
{
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(live_view.getInnerQuery(),
context, proxy_storage,
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().in);
while (Block this_block = data_mergeable_stream->read())
new_mergeable_blocks->push_back(this_block);
if (new_mergeable_blocks->empty())
return;
{
Poco::FastMutex::ScopedLock lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->push_back(new_mergeable_blocks);
/// Create from streams
for (auto & blocks_ : *mergeable_blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
from.push_back(std::move(stream));
}
}
}
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
data = std::make_shared<SquashingBlockInputStream>(
data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes);
copyData(*data, *output);
}
private:
String select_database_name;
String select_table_name;
String table_name;
String database_name;
ASTPtr inner_query;
Context & global_context;
bool is_temporary {false};
mutable Block sample_block;
/// Active users
std::shared_ptr<bool> active_ptr;
/// Current data blocks that store query result
std::shared_ptr<BlocksPtr> blocks_ptr;
/// Current data blocks metadata
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
BlocksPtrs mergeable_blocks;
void noUsersThread(const UInt64 & timeout);
std::thread no_users_thread;
std::atomic<bool> shutdown_called{false};
std::atomic<bool> startnousersthread_called{false};
UInt64 temporary_live_view_timeout;
StorageLiveView(
const String & table_name_,
const String & database_name_,
Context & local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns
);
};
class LiveViewBlockOutputStream : public IBlockOutputStream
{
public:
explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
void writePrefix() override
{
new_blocks = std::make_shared<Blocks>();
new_blocks_metadata = std::make_shared<BlocksMetadata>();
new_hash = std::make_shared<SipHash>();
}
void writeSuffix() override
{
UInt128 key;
String key_str;
new_hash->get128(key.low, key.high);
key_str = key.toHexString();
Poco::FastMutex::ScopedLock lock(storage.mutex);
if (storage.getBlocksHashKey() != key_str)
{
new_blocks_metadata->hash = key_str;
new_blocks_metadata->version = storage.getBlocksVersion() + 1;
for (auto & block : *new_blocks)
{
block.insert({DataTypeUInt64().createColumnConst(
block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"_version"});
}
(*storage.blocks_ptr) = new_blocks;
(*storage.blocks_metadata_ptr) = new_blocks_metadata;
storage.condition.broadcast();
}
new_blocks.reset();
new_blocks_metadata.reset();
new_hash.reset();
}
void write(const Block & block) override
{
new_blocks->push_back(block);
block.updateHash(*new_hash);
}
Block getHeader() const override { return storage.getHeader(); }
private:
BlocksPtr new_blocks;
BlocksMetadataPtr new_blocks_metadata;
SipHashPtr new_hash;
StorageLiveView & storage;
};
}

View File

@ -630,8 +630,6 @@ bool StorageMergeTree::tryMutatePart()
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMergingPartsTagger> tagger;
{
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
std::lock_guard lock(currently_merging_mutex);
if (current_mutations_by_version.empty())
@ -647,8 +645,7 @@ bool StorageMergeTree::tryMutatePart()
if (mutations_begin_it == mutations_end_it)
continue;
auto estimated_needed_space = MergeTreeDataMergerMutator::estimateNeededDiskSpace({part});
if (estimated_needed_space > disk_space)
if (merger_mutator.getMaxSourcePartSizeForMutation() < part->bytes_on_disk)
continue;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
@ -661,7 +658,7 @@ bool StorageMergeTree::tryMutatePart()
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
tagger.emplace(future_part, estimated_needed_space, *this);
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this);
break;
}
}

View File

@ -2219,17 +2219,18 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_and_mutations_queued = queue.countMergesAndPartMutations();
if (merges_and_mutations_queued >= settings.max_replicated_merges_in_queue)
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second;
if (merges_and_mutations_sum >= settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued
<< ") is greater than max_replicated_merges_in_queue ("
LOG_TRACE(log, "Number of queued merges (" << merges_and_mutations_queued.first << ") and part mutations ("
<< merges_and_mutations_queued.second << ") is greater than max_replicated_merges_in_queue ("
<< settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
}
else
{
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
settings.max_replicated_merges_in_queue, merges_and_mutations_queued);
settings.max_replicated_merges_in_queue, merges_and_mutations_sum);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
FutureMergedMutatedPart future_merged_part;
@ -2239,7 +2240,9 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl);
}
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0)
/// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
&& merges_and_mutations_queued.second < settings.max_replicated_mutations_in_queue)
{
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVector();

View File

@ -25,7 +25,6 @@ void registerStorageJoin(StorageFactory & factory);
void registerStorageView(StorageFactory & factory);
void registerStorageMaterializedView(StorageFactory & factory);
void registerStorageLiveView(StorageFactory & factory);
//void registerStorageLiveChannel(StorageFactory & factory);
#if USE_HDFS
void registerStorageHDFS(StorageFactory & factory);
@ -67,7 +66,6 @@ void registerStorages()
registerStorageView(factory);
registerStorageMaterializedView(factory);
registerStorageLiveView(factory);
//registerStorageLiveChannel(factory);
#if USE_HDFS
registerStorageHDFS(factory);

View File

@ -0,0 +1,6 @@
<yandex>
<merge_tree>
<parts_to_delay_insert>50</parts_to_delay_insert>
<parts_to_throw_insert>50</parts_to_throw_insert>
</merge_tree>
</yandex>

View File

@ -10,21 +10,29 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node1 = cluster.add_instance('node1', macros={'cluster': 'test1'}, with_zookeeper=True)
# Check, that limits on max part size for merges doesn`t affect mutations
node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True)
nodes = [node1, node2]
node2 = cluster.add_instance('node2', macros={'cluster': 'test1'}, main_configs=["configs/merge_tree.xml"], with_zookeeper=True)
node3 = cluster.add_instance('node3', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_max_parts.xml"], with_zookeeper=True)
node4 = cluster.add_instance('node4', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_max_parts.xml"], with_zookeeper=True)
node5 = cluster.add_instance('node5', macros={'cluster': 'test3'}, main_configs=["configs/merge_tree_max_parts.xml"])
all_nodes = [node1, node2, node3, node4, node5]
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in nodes:
for node in all_nodes:
node.query("DROP TABLE IF EXISTS test_mutations")
for node in nodes:
node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)")
for node in [node1, node2, node3, node4]:
node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)")
node5.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)")
yield cluster
@ -33,7 +41,8 @@ def started_cluster():
class Runner:
def __init__(self):
def __init__(self, nodes):
self.nodes = nodes
self.mtx = threading.Lock()
self.total_inserted_xs = 0
self.total_inserted_rows = 0
@ -49,7 +58,9 @@ class Runner:
self.stop_ev = threading.Event()
def do_insert(self, thread_num):
self.exceptions = []
def do_insert(self, thread_num, partitions_num):
self.stop_ev.wait(random.random())
# Each thread inserts a small random number of rows with random year, month 01 and day determined
@ -67,7 +78,7 @@ class Runner:
for x in xs:
self.currently_inserting_xs[x] += 1
year = 2000 + random.randint(0, 10)
year = 2000 + random.randint(0, partitions_num)
date_str = '{year}-{month}-{day}'.format(year=year, month=month, day=day)
payload = ''
for x in xs:
@ -76,7 +87,7 @@ class Runner:
try:
print 'thread {}: insert for {}: {}'.format(thread_num, date_str, ','.join(str(x) for x in xs))
random.choice(nodes).query("INSERT INTO test_mutations FORMAT TSV", payload)
random.choice(self.nodes).query("INSERT INTO test_mutations FORMAT TSV", payload)
with self.mtx:
for x in xs:
@ -86,6 +97,7 @@ class Runner:
except Exception, e:
print 'Exception while inserting,', e
self.exceptions.append(e)
finally:
with self.mtx:
for x in xs:
@ -113,7 +125,7 @@ class Runner:
try:
print 'thread {}: delete {} * {}'.format(thread_num, to_delete_count, x)
random.choice(nodes).query("ALTER TABLE test_mutations DELETE WHERE x = {}".format(x))
random.choice(self.nodes).query("ALTER TABLE test_mutations DELETE WHERE x = {}".format(x))
with self.mtx:
self.total_mutations += 1
@ -130,14 +142,27 @@ class Runner:
self.stop_ev.wait(1.0 + random.random() * 2)
def wait_for_mutations(nodes, number_of_mutations):
for i in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
def get_done_mutations(node):
return int(node.query("SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'").rstrip())
if all([get_done_mutations(n) == number_of_mutations for n in nodes]):
return True
return False
def test_mutations(started_cluster):
DURATION_SECONDS = 30
nodes = [node1, node2]
runner = Runner()
runner = Runner(nodes)
threads = []
for thread_num in range(5):
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, )))
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 10)))
for thread_num in (11, 12, 13):
threads.append(threading.Thread(target=runner.do_delete, args=(thread_num,)))
@ -155,18 +180,11 @@ def test_mutations(started_cluster):
assert runner.total_inserted_rows > 0
assert runner.total_mutations > 0
all_done = False
for i in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
all_done = wait_for_mutations(nodes, runner.total_mutations)
def get_done_mutations(node):
return int(node.query("SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'").rstrip())
if all([get_done_mutations(n) == runner.total_mutations for n in nodes]):
all_done = True
break
print node1.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames")
print "Total mutations: ", runner.total_mutations
for node in nodes:
print node.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames")
assert all_done
expected_sum = runner.total_inserted_xs - runner.total_deleted_xs
@ -174,3 +192,44 @@ def test_mutations(started_cluster):
for i, node in enumerate(nodes):
actual_sums.append(int(node.query("SELECT sum(x) FROM test_mutations").rstrip()))
assert actual_sums[i] == expected_sum
@pytest.mark.parametrize(
('nodes', ),
[
([node5, ], ), # MergeTree
([node3, node4], ), # ReplicatedMergeTree
]
)
def test_mutations_dont_prevent_merges(started_cluster, nodes):
for year in range(2000, 2016):
rows = ''
date_str = '{}-01-{}'.format(year, random.randint(1, 10))
for i in range(10):
rows += '{} {} {}\n'.format(date_str, random.randint(1, 10), i)
nodes[0].query("INSERT INTO test_mutations FORMAT TSV", rows)
# will run mutations of 16 parts in parallel, mutations will sleep for about 20 seconds
nodes[0].query("ALTER TABLE test_mutations UPDATE i = sleepEachRow(2) WHERE 1")
runner = Runner(nodes)
threads = []
for thread_num in range(2):
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 0)))
# will insert approx 8-10 new parts per 1 second into one partition
for t in threads:
t.start()
all_done = wait_for_mutations(nodes, 1)
runner.stop_ev.set()
for t in threads:
t.join()
for node in nodes:
print node.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames")
print node.query("SELECT partition, count(name), sum(active), sum(active*rows) FROM system.parts WHERE table ='test_mutations' GROUP BY partition FORMAT TSVWithNames")
assert all_done
assert all([str(e).find("Too many parts") < 0 for e in runner.exceptions])

View File

@ -6,4 +6,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -o errexit
set -o pipefail
for i in {1..10}; do seq 1 10 | sed 's/.*/SELECT 1 % ((number + 500) % 1000) FROM system.numbers_mt LIMIT 1000;/' | $CLICKHOUSE_CLIENT -n --receive_timeout=1 --max_block_size=1 >/dev/null 2>&1 && echo 'Fail!' && break; echo -n '.'; done; echo
for i in {1..10}; do seq 1 10 | sed 's/.*/SELECT 1 % ((number + 500) % 1000) FROM numbers_mt(1000);/' | $CLICKHOUSE_CLIENT -n --receive_timeout=1 --max_block_size=1 >/dev/null 2>&1 && echo 'Fail!' && break; echo -n '.'; done; echo

View File

@ -1,5 +1,5 @@
SET max_bytes_before_external_group_by = 100000000;
SET max_memory_usage = 201000000;
SET max_memory_usage = 351000000;
SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k);
SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k);

View File

@ -3,17 +3,18 @@
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
9
< X-ClickHouse-Progress: {"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"2","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"3","read_bytes":"24","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"4","read_bytes":"32","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"6","read_bytes":"48","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"7","read_bytes":"56","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"8","read_bytes":"64","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"9","read_bytes":"72","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
< X-ClickHouse-Progress: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"2","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"3","read_bytes":"24","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"4","read_bytes":"32","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"5","read_bytes":"40","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"6","read_bytes":"48","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"7","read_bytes":"56","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"8","read_bytes":"64","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"9","read_bytes":"72","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
< X-ClickHouse-Progress: {"read_rows":"10","read_bytes":"80","written_rows":"0","written_bytes":"0","total_rows_to_read":"10"}
0
1
2

View File

@ -6,9 +6,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=5&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d 'SELECT max(number) FROM numbers(10)' 2>&1 | grep -E 'Content-Encoding|X-ClickHouse-Progress|^[0-9]'
# This test will fail with external poco (progress not supported)
# "grep -v 11" in order to skip extra progress header for 11-th row (for processors pipeline)
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0&experimental_use_processors=0" -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep -E 'Content-Encoding|X-ClickHouse-Progress|^[0-9]' | grep -v 11
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?max_block_size=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0&enable_http_compression=1&experimental_use_processors=0" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' | gzip -d
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d 'SELECT number FROM numbers(10)' 2>&1 | grep -E 'Content-Encoding|X-ClickHouse-Progress|^[0-9]'
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?max_block_size=1&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' | gzip -d
# 'send_progress_in_http_headers' is false by default
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}?max_block_size=1&http_headers_progress_interval_ms=0" -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep -q 'X-ClickHouse-Progress' && echo 'Fail' || true

View File

@ -1,5 +1,7 @@
0
1 0
3 0
2 0
1
1
1
finished 42 readonly SELECT 2, count() FROM system.numbers
1
44

View File

@ -1,17 +1,18 @@
#!/usr/bin/env bash
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e -o pipefail
function wait_for_query_to_start()
{
while [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.1; done
}
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT sum(ignore(*)) FROM (SELECT number % 1000 AS k, groupArray(number) FROM numbers(100000000) GROUP BY k)' 2>&1 > /dev/null &
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 1, count() FROM system.numbers' 2>&1 > /dev/null &
wait_for_query_to_start 'hello'
# Replace it
@ -20,15 +21,20 @@ $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d
# Wait for it to be replaced
wait
${CLICKHOUSE_CLIENT} --user=readonly --query_id=42 --query='SELECT 1, sleep(1)' &
${CLICKHOUSE_CLIENT} --user=readonly --query_id=42 --query='SELECT 2, count() FROM system.numbers' 2>&1 | grep -cF 'was cancelled' &
wait_for_query_to_start '42'
( ${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 43' ||: ) 2>&1 | grep -F 'is already running by user' > /dev/null
# Trying to run another query with the same query_id
${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 43' 2>&1 | grep -cF 'is already running by user'
# Trying to replace query of a different user
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=42&replace_running_query=1" -d 'SELECT 1' | grep -cF 'is already running by user'
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "KILL QUERY WHERE query_id = '42' SYNC"
wait
${CLICKHOUSE_CLIENT} --query='SELECT 3, sleep(1)' &
sleep 0.1
${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 2, sleep(1)' &
${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 3, count() FROM system.numbers' 2>&1 | grep -cF 'was cancelled' &
wait_for_query_to_start '42'
( ${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --queue_max_wait_ms=500 --query='SELECT 43' ||: ) 2>&1 | grep -F "can't be stopped" > /dev/null
${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --queue_max_wait_ms=500 --query='SELECT 43' 2>&1 | grep -F "can't be stopped" > /dev/null
${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --query='SELECT 44'
wait

View File

@ -0,0 +1,69 @@
Zero offset
0 0
1 1
2 2
Nullable values
\N 0 \N
\N 1 2
2 2 \N
Result with different type
0 1
1 2
2 -10
Offset > block
0 0
1 0
2 0
Abs(Offset) > block
0 0
1 0
2 0
Positive offset
0 1
1 2
2 0
Negative offset
0 1
1 2
2 0
Positive offset with defaults
0 2
1 3
2 12
3 13
Negative offset with defaults
0 10
1 11
2 0
3 1
Positive offset with const defaults
0 1
1 2
2 1000
Negative offset with const defaults
0 1000
1 0
2 1
Dynamic column and offset, out of bounds
0 0 0
1 2 3
2 4 20
3 6 30
Dynamic column and offset, negative
0 0 0
1 -2 10
2 -4 20
3 -6 30
4 -8 40
5 -10 50
Dynamic column and offset, without defaults
0 4 4
1 2 3
2 0 2
3 -2 1
4 -4 0
5 -6 0
Constant column
0 1000
1 1000
2 1000

View File

@ -0,0 +1,42 @@
-- no arguments
select neighbour(); -- { serverError 42 }
-- single argument
select neighbour(1); -- { serverError 42 }
-- greater than 3 arguments
select neighbour(1,2,3,4); -- { serverError 42 }
-- bad default value
select neighbour(dummy, 1, 'hello'); -- { serverError 43 }
-- types without common supertype (UInt64 and Int8)
select number, neighbour(number, 1, -10) from numbers(3); -- { serverError 43 }
-- nullable offset is not allowed
select number, if(number > 1, number, null) as offset, neighbour(number, offset) from numbers(3); -- { serverError 43 }
select 'Zero offset';
select number, neighbour(number, 0) from numbers(3);
select 'Nullable values';
select if(number > 1, number, null) as value, number as offset, neighbour(value, offset) as neighbour from numbers(3);
select 'Result with different type';
select toInt32(number) as n, neighbour(n, 1, -10) from numbers(3);
select 'Offset > block';
select number, neighbour(number, 10) from numbers(3);
select 'Abs(Offset) > block';
select number, neighbour(number, -10) from numbers(3);
select 'Positive offset';
select number, neighbour(number, 1) from numbers(3);
select 'Negative offset';
select number, neighbour(number, 1) from numbers(3);
select 'Positive offset with defaults';
select number, neighbour(number, 2, number + 10) from numbers(4);
select 'Negative offset with defaults';
select number, neighbour(number, -2, number + 10) from numbers(4);
select 'Positive offset with const defaults';
select number, neighbour(number, 1, 1000) from numbers(3);
select 'Negative offset with const defaults';
select number, neighbour(number, -1, 1000) from numbers(3);
select 'Dynamic column and offset, out of bounds';
select number, number * 2 as offset, neighbour(number, offset, number * 10) from numbers(4);
select 'Dynamic column and offset, negative';
select number, -number * 2 as offset, neighbour(number, offset, number * 10) from numbers(6);
select 'Dynamic column and offset, without defaults';
select number, -(number - 2) * 2 as offset, neighbour(number, offset) from numbers(6);
select 'Constant column';
select number, neighbour(1000, 10) from numbers(3);

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client1.expect('1.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect('3.*' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,18 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,49 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET temporary_live_view_timeout=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client2.expect(prompt)
client1.expect(r'6.*2' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client2.expect(prompt)
client1.expect(r'21.*3' + end_of_block)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('SELECT sleep(1)')
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect('Table test.lv doesn\'t exist')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect('2.*' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,45 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client1.expect('Progress: 2.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: 2.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,37 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?query=WATCH%20test.lv%20EVENTS'}, name='client2>', log=log) as client2:
client2.expect('.*1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,37 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?query=WATCH%20test.lv'}, name='client2>', log=log) as client2:
client2.expect('.*0\t1\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('.*6\t2\n')
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,4 +0,0 @@
{"row":{"a":1}}
{"row":{"a":2}}
{"row":{"a":3}}
{"progress":{"read_rows":"3","read_bytes":"36","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -1,12 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT * FROM test.lv FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,6 +0,0 @@
{"row":{"sum(a)":"0","_version":"1"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"6","_version":"2"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}
{"row":{"sum(a)":"21","_version":"3"}}
{"progress":{"read_rows":"1","read_bytes":"16","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}

View File

@ -1,18 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0 FORMAT JSONEachRowWithProgress;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url': '/?live_view_heartbeat_interval=1&query=WATCH%20test.lv%20EVENTS%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}\n', escape=True)
client2.expect('{"row":{"version":"1"}', escape=True)
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
# heartbeat is provided by progress message
client2.expect('{"progress":{"read_rows":"1","read_bytes":"8","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}}', escape=True)
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('{"row":{"version":"2"}}\n', escape=True)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python
import os
import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
from httpclient import client as http_client
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1:
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
with http_client({'method':'GET', 'url':'/?live_view_heartbeat_interval=1&query=WATCH%20test.lv%20FORMAT%20JSONEachRowWithProgress'}, name='client2>', log=log) as client2:
client2.expect('"progress".*',)
client2.expect('{"row":{"sum(a)":"0","_version":"1"}}\n', escape=True)
client2.expect('"progress".*\n')
# heartbeat is provided by progress message
client2.expect('"progress".*\n')
client1.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(prompt)
client2.expect('"progress".*"read_rows":"2".*\n')
client2.expect('{"row":{"sum(a)":"6","_version":"2"}}\n', escape=True)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,7 +0,0 @@
DROP TABLE IF EXISTS test.lv;
CREATE LIVE VIEW test.lv AS SELECT 1;
SELECT * FROM test.lv;
DROP TABLE test.lv;

View File

@ -1,4 +0,0 @@
6 1
6 1
12 2
12 2

View File

@ -1,18 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
SELECT *,_version FROM test.lv;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
SELECT *,_version FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,16 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT sum(a) FROM test.lv;
INSERT INTO test.mt VALUES (4),(5),(6);
SELECT sum(a) FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,7 +0,0 @@
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,12 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT * FROM test.mt;
INSERT INTO test.mt VALUES (1),(2),(3);
SELECT *,_version FROM test.lv;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,18 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv EVENTS LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv EVENTS LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv EVENTS LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,3 +0,0 @@
0 1
6 2
21 3

View File

@ -1,18 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (4),(5),(6);
WATCH test.lv LIMIT 0;
DROP TABLE test.lv;
DROP TABLE test.mt;

View File

@ -1,48 +0,0 @@
#!/usr/bin/env python
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
#log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv')
client1.expect(r'0.*1' + end_of_block)
client2.send('INSERT INTO test.mt VALUES (1),(2),(3)')
client1.expect(r'6.*2' + end_of_block)
client2.expect(prompt)
client2.send('INSERT INTO test.mt VALUES (4),(5),(6)')
client1.expect(r'21.*3' + end_of_block)
client2.expect(prompt)
for i in range(1,129):
client2.send('INSERT INTO test.mt VALUES (1)')
client1.expect(r'%d.*%d' % (21+i, 3+i) + end_of_block)
client2.expect(prompt)
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE test.lv')
client1.expect(prompt)
client1.send('DROP TABLE test.mt')
client1.expect(prompt)

View File

@ -1,3 +0,0 @@
temporary_live_view_timeout 5
live_view_heartbeat_interval 15
0

View File

@ -1,15 +0,0 @@
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
SELECT name, value from system.settings WHERE name = 'temporary_live_view_timeout';
SELECT name, value from system.settings WHERE name = 'live_view_heartbeat_interval';
SET temporary_live_view_timeout=1;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
SHOW TABLES LIKE 'lv';
SELECT sleep(2);
SHOW TABLES LIKE 'lv';
DROP TABLE test.mt;

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,7 +0,0 @@
0 1
0 1
6 2
6 2
21 3
21 3
None

View File

@ -1,6 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_live_view_watch_event_live.python

View File

@ -1,63 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_http_query(query):
cmd = list(CLICKHOUSE_CURL.split()) # list(['curl', '-sSN', '--max-time', '10'])
cmd += ['-sSN', CLICKHOUSE_URL, '-d', query]
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def read_lines_and_push_to_queue(pipe, queue):
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
pipe = send_http_query('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(pipe, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,4 +0,0 @@
0 1
0 1
6 2
6 2

View File

@ -1,6 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_live_view_watch_http.python

View File

@ -1,83 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query, '--live_view_heartbeat_interval=1', '--progress']
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
# print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
# print(line)
assert (line.endswith('0\t1'))
assert ('Progress: 0.00 rows' in line)
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
assert (line.endswith('6\t2'))
assert ('Progress: 1.00 rows' in line)
# send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
# line = q.get()
# print(line)
# assert (line.endswith('6\t2'))
# assert ('Progress: 1.00 rows' in line)
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
# print(line)
# assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,6 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_temporary_live_view_watch_events_heartbeat.python

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python
import subprocess
import threading
import Queue as queue
import os
import sys
import signal
CLICKHOUSE_CLIENT = os.environ.get('CLICKHOUSE_CLIENT')
CLICKHOUSE_CURL = os.environ.get('CLICKHOUSE_CURL')
CLICKHOUSE_URL = os.environ.get('CLICKHOUSE_URL')
def send_query(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout
def send_query_in_process_group(query):
cmd = list(CLICKHOUSE_CLIENT.split())
cmd += ['--query', query]
# print(cmd)
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid)
def read_lines_and_push_to_queue(pipe, queue):
try:
for line in iter(pipe.readline, ''):
line = line.strip()
print(line)
sys.stdout.flush()
queue.put(line)
except KeyboardInterrupt:
pass
queue.put(None)
def test():
send_query('DROP TABLE IF EXISTS test.lv').read()
send_query('DROP TABLE IF EXISTS test.mt').read()
send_query('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()').read()
send_query('CREATE TEMPORARY LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt').read()
q = queue.Queue()
p = send_query_in_process_group('WATCH test.lv')
thread = threading.Thread(target=read_lines_and_push_to_queue, args=(p.stdout, q))
thread.start()
line = q.get()
print(line)
assert (line == '0\t1')
send_query('INSERT INTO test.mt VALUES (1),(2),(3)').read()
line = q.get()
print(line)
assert (line == '6\t2')
send_query('INSERT INTO test.mt VALUES (4),(5),(6)').read()
line = q.get()
print(line)
assert (line == '21\t3')
# Send Ctrl+C to client.
os.killpg(os.getpgid(p.pid), signal.SIGINT)
# This insert shouldn't affect lv.
send_query('INSERT INTO test.mt VALUES (7),(8),(9)').read()
line = q.get()
print(line)
assert (line is None)
send_query('DROP TABLE if exists test.lv').read()
send_query('DROP TABLE if exists test.lv').read()
thread.join()
test()

View File

@ -1,7 +0,0 @@
0 1
0 1
6 2
6 2
21 3
21 3
None

View File

@ -1,6 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
python $CURDIR/00991_temporary_live_view_watch_live.python

Some files were not shown because too many files have changed in this diff Show More