ClickHouse/src/Storages/StorageBuffer.cpp

1168 lines
44 KiB
C++
Raw Normal View History

#include <boost/range/algorithm_ext/erase.hpp>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h>
2021-02-05 11:41:44 +00:00
#include <Interpreters/addMissingDefaults.h>
#include <Interpreters/getColumnFromBlock.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
2018-12-25 23:14:39 +00:00
#include <Storages/AlterCommands.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
2021-06-14 04:13:35 +00:00
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
2018-06-05 19:46:49 +00:00
#include <Common/ProfileEvents.h>
2021-10-02 07:13:14 +00:00
#include <base/logger_useful.h>
#include <base/getThreadId.h>
#include <base/range.h>
2020-11-17 17:16:55 +00:00
#include <Processors/QueryPlan/ExpressionStep.h>
2020-01-29 18:14:40 +00:00
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
2021-07-23 14:25:35 +00:00
#include <Processors/Sinks/SinkToStorage.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Sources/SourceWithProgress.h>
2021-09-08 18:29:38 +00:00
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h>
2021-03-04 17:38:12 +00:00
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
2021-07-17 18:06:46 +00:00
namespace ProfileEvents
{
extern const Event StorageBufferFlush;
extern const Event StorageBufferErrorOnFlush;
extern const Event StorageBufferPassedAllMinThresholds;
extern const Event StorageBufferPassedTimeMaxThreshold;
extern const Event StorageBufferPassedRowsMaxThreshold;
extern const Event StorageBufferPassedBytesMaxThreshold;
extern const Event StorageBufferPassedTimeFlushThreshold;
extern const Event StorageBufferPassedRowsFlushThreshold;
extern const Event StorageBufferPassedBytesFlushThreshold;
extern const Event StorageBufferLayerLockReadersWaitMilliseconds;
extern const Event StorageBufferLayerLockWritersWaitMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric StorageBufferRows;
extern const Metric StorageBufferBytes;
}
namespace DB
{
namespace ErrorCodes
{
2020-07-13 13:58:15 +00:00
extern const int BAD_ARGUMENTS;
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int INFINITE_LOOP;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
}
std::unique_lock<std::mutex> StorageBuffer::Buffer::lockForReading() const
{
return lockImpl(/* read= */true);
}
std::unique_lock<std::mutex> StorageBuffer::Buffer::lockForWriting() const
{
return lockImpl(/* read= */false);
}
std::unique_lock<std::mutex> StorageBuffer::Buffer::tryLock() const
{
std::unique_lock lock(mutex, std::try_to_lock);
return lock;
}
std::unique_lock<std::mutex> StorageBuffer::Buffer::lockImpl(bool read) const
{
std::unique_lock lock(mutex, std::defer_lock);
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
lock.lock();
UInt64 elapsed = watch.elapsedMilliseconds();
if (read)
ProfileEvents::increment(ProfileEvents::StorageBufferLayerLockReadersWaitMilliseconds, elapsed);
else
ProfileEvents::increment(ProfileEvents::StorageBufferLayerLockWritersWaitMilliseconds, elapsed);
return lock;
}
2019-12-04 16:06:55 +00:00
StorageBuffer::StorageBuffer(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
ContextPtr context_,
2019-12-04 16:06:55 +00:00
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
2020-02-17 19:28:25 +00:00
const StorageID & destination_id_,
2019-12-04 16:06:55 +00:00
bool allow_materialized_)
: IStorage(table_id_)
, WithContext(context_->getBufferContext())
2021-04-23 12:18:23 +00:00
, num_shards(num_shards_)
, buffers(num_shards_)
2019-12-04 16:06:55 +00:00
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, flush_thresholds(flush_thresholds_)
2020-02-17 19:28:25 +00:00
, destination_id(destination_id_)
2019-12-04 16:06:55 +00:00
, allow_materialized(allow_materialized_)
2020-05-30 21:57:37 +00:00
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
, bg_pool(getContext()->getBufferFlushSchedulePool())
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto dest_table = DatabaseCatalog::instance().getTable(destination_id, context_);
storage_metadata.setColumns(dest_table->getInMemoryMetadataPtr()->getColumns());
}
else
storage_metadata.setColumns(columns_);
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
Fix data-race between flush() and startup() in StorageBuffer Stress tests found [1], TSan report: ================== WARNING: ThreadSanitizer: data race (pid=485) Read of size 8 at 0x7b5001280bd8 by thread T567 (mutexes: write M612061890855345680): 1 std::__1::shared_ptr<DB::BackgroundSchedulePoolTaskInfo>::operator bool() const obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2851:62 (clickhouse+0x159140a6) 2 bool std::__1::operator!=<DB::BackgroundSchedulePoolTaskInfo>() obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3447:30 (clickhouse+0x159140a6) 3 DB::BackgroundSchedulePoolTaskHolder::operator bool() const obj-x86_64-linux-gnu/../src/Core/BackgroundSchedulePool.h:164:46 (clickhouse+0x159140a6) 4 DB::StorageBuffer::flush() obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:675:10 (clickhouse+0x159140a6) Previous write of size 8 at 0x7b5001280bd8 by thread T586 (mutexes: write M191819750614415520): 2 std::__1::shared_ptr<DB::BackgroundSchedulePoolTaskInfo>::operator=(std::__1::shared_ptr<DB::BackgroundSchedulePoolTaskInfo>&&) obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3243:34 (clickhouse+0x15913e22) 3 DB::BackgroundSchedulePoolTaskHolder::operator=() obj-x86_64-linux-gnu/../src/Core/BackgroundSchedulePool.h:156:110 (clickhouse+0x15913e22) 4 DB::StorageBuffer::startup() obj-x86_64-linux-gnu/../src/Storages/StorageBuffer.cpp:668:18 (clickhouse+0x15913e22) 5 DB::InterpreterCreateQuery::doCreateTable() obj-x86_64-linux-gnu/../src/Interpreters/InterpreterCreateQuery.cpp:1092:10 (clickhouse+0x149bef7b) 6 DB::InterpreterCreateQuery::createTable() obj-x86_64-linux-gnu/../src/Interpreters/InterpreterCreateQuery.cpp:952:20 (clickhouse+0x149ba9f5) 7 DB::InterpreterCreateQuery::execute() obj-x86_64-linux-gnu/../src/Interpreters/InterpreterCreateQuery.cpp:1302:16 (clickhouse+0x149c1086) [1]: https://clickhouse-test-reports.s3.yandex.net/0/1c9778603ff49563d1d3d0d357de0608167e504d/stress_test_(thread).html Fixes: #29416
2021-10-09 22:51:41 +00:00
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); });
}
2017-03-12 19:18:07 +00:00
/// Reads from one buffer (from one block) under its mutex.
2020-01-29 18:14:40 +00:00
class BufferSource : public SourceWithProgress
{
public:
BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageSnapshotPtr & storage_snapshot)
: SourceWithProgress(storage_snapshot->getSampleBlockForColumns(column_names_))
, column_names_and_types(storage_snapshot->getColumnsByNames(
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns(), column_names_))
, buffer(buffer_) {}
String getName() const override { return "Buffer"; }
protected:
2020-01-29 18:14:40 +00:00
Chunk generate() override
{
2020-01-29 18:14:40 +00:00
Chunk res;
if (has_been_read)
return res;
has_been_read = true;
std::unique_lock lock(buffer.lockForReading());
if (!buffer.data.rows())
return res;
2020-01-29 18:14:40 +00:00
Columns columns;
2020-12-07 19:02:26 +00:00
columns.reserve(column_names_and_types.size());
2020-01-29 18:14:40 +00:00
2020-12-07 19:02:26 +00:00
for (const auto & elem : column_names_and_types)
columns.emplace_back(getColumnFromBlock(buffer.data, elem));
2020-01-29 18:14:40 +00:00
UInt64 size = columns.at(0)->size();
res.setColumns(std::move(columns), size);
return res;
}
private:
2020-12-07 19:02:26 +00:00
NamesAndTypesList column_names_and_types;
StorageBuffer::Buffer & buffer;
bool has_been_read = false;
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageSnapshotPtr &,
SelectQueryInfo & query_info) const
{
2020-02-17 19:28:25 +00:00
if (destination_id)
{
auto destination = DatabaseCatalog::instance().getTable(destination_id, local_context);
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
/// TODO: Find a way to support projections for StorageBuffer
query_info.ignore_projections = true;
const auto & destination_metadata = destination->getInMemoryMetadataPtr();
return destination->getQueryProcessingStage(local_context, to_stage, destination->getStorageSnapshot(destination_metadata, local_context), query_info);
}
return QueryProcessingStage::FetchColumns;
}
2020-01-30 10:22:59 +00:00
2020-08-03 13:54:14 +00:00
Pipe StorageBuffer::read(
2020-01-30 10:26:25 +00:00
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
2021-03-04 17:38:12 +00:00
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));
}
void StorageBuffer::read(
QueryPlan & query_plan,
2020-01-30 10:26:25 +00:00
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
2020-01-30 10:26:25 +00:00
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
const auto & metadata_snapshot = storage_snapshot->metadata;
2020-02-17 19:28:25 +00:00
if (destination_id)
{
auto destination = DatabaseCatalog::instance().getTable(destination_id, local_context);
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
2020-06-16 18:41:11 +00:00
auto destination_metadata_snapshot = destination->getInMemoryMetadataPtr();
auto destination_snapshot = destination->getStorageSnapshot(destination_metadata_snapshot, local_context);
2020-06-16 18:41:11 +00:00
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination_metadata_snapshot](const String& column_name)
{
const auto & dest_columns = destination_metadata_snapshot->getColumns();
const auto & our_columns = metadata_snapshot->getColumns();
auto dest_columm = dest_columns.tryGetColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
return dest_columm && dest_columm->type->equals(*our_columns.getColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name).type);
});
if (dst_has_same_structure)
{
2020-05-13 13:49:10 +00:00
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination_metadata_snapshot, local_context);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
destination->read(
query_plan, column_names, destination_snapshot, query_info,
local_context, processed_stage, max_block_size, num_streams);
}
else
{
/// There is a struct mismatch and we need to convert read blocks from the destination table.
const Block header = metadata_snapshot->getSampleBlock();
Names columns_intersection = column_names;
Block header_after_adding_defaults = header;
const auto & dest_columns = destination_metadata_snapshot->getColumns();
const auto & our_columns = metadata_snapshot->getColumns();
for (const String & column_name : column_names)
{
if (!dest_columns.hasPhysical(column_name))
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Destination table {} doesn't have column {}. The default values are used.", destination_id.getNameForLogs(), backQuoteIfNeed(column_name));
boost::range::remove_erase(columns_intersection, column_name);
continue;
}
2020-04-27 15:38:35 +00:00
const auto & dst_col = dest_columns.getPhysical(column_name);
const auto & col = our_columns.getPhysical(column_name);
if (!dst_col.type->equals(*col.type))
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Destination table {} has different type of column {} ({} != {}). Data from destination table are converted.", destination_id.getNameForLogs(), backQuoteIfNeed(column_name), dst_col.type->getName(), col.type->getName());
header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name);
}
}
if (columns_intersection.empty())
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Destination table {} has no common columns with block in buffer. Block of data is skipped.", destination_id.getNameForLogs());
}
else
{
destination->read(
query_plan, columns_intersection, destination_snapshot, query_info,
local_context, processed_stage, max_block_size, num_streams);
2020-10-06 08:21:05 +00:00
if (query_plan.isInitialized())
{
2020-10-06 08:21:05 +00:00
2021-02-05 11:41:44 +00:00
auto actions = addMissingDefaults(
query_plan.getCurrentDataStream().header,
header_after_adding_defaults.getNamesAndTypesList(),
metadata_snapshot->getColumns(),
local_context);
2021-02-05 11:41:44 +00:00
auto adding_missed = std::make_unique<ExpressionStep>(
2020-10-06 08:21:05 +00:00
query_plan.getCurrentDataStream(),
2021-03-04 17:38:12 +00:00
std::move(actions));
2020-10-06 08:21:05 +00:00
adding_missed->setStepDescription("Add columns missing in destination table");
query_plan.addStep(std::move(adding_missed));
2020-11-17 17:16:55 +00:00
auto actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
2021-03-04 17:38:12 +00:00
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), actions_dag);
2020-10-06 08:21:05 +00:00
converting->setStepDescription("Convert destination table columns to Buffer table structure");
query_plan.addStep(std::move(converting));
}
}
}
if (query_plan.isInitialized())
{
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Add table lock for destination table.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
destination,
std::move(destination_lock),
limits,
leaf_limits,
nullptr,
nullptr);
adding_limits_and_quota->setStepDescription("Lock destination table for Buffer");
query_plan.addStep(std::move(adding_limits_and_quota));
}
}
2020-08-03 13:54:14 +00:00
Pipe pipe_from_buffers;
{
Pipes pipes_from_buffers;
pipes_from_buffers.reserve(num_shards);
for (auto & buf : buffers)
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, storage_snapshot));
2020-08-03 13:54:14 +00:00
pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers));
}
if (pipe_from_buffers.empty())
return;
QueryPlan buffers_plan;
/** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
{
/// TODO: Find a way to support projections for StorageBuffer
auto interpreter = InterpreterSelectQuery(
query_info.query, local_context, std::move(pipe_from_buffers),
SelectQueryOptions(processed_stage).ignoreProjections());
interpreter.buildQueryPlan(buffers_plan);
}
else
{
if (query_info.prewhere_info)
{
2021-06-25 14:49:28 +00:00
auto actions_settings = ExpressionActionsSettings::fromContext(local_context);
2021-02-15 19:48:06 +00:00
if (query_info.prewhere_info->alias_actions)
2020-08-03 13:54:14 +00:00
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
2021-02-15 19:48:06 +00:00
return std::make_shared<ExpressionTransform>(
header,
2021-06-25 14:49:28 +00:00
std::make_shared<ExpressionActions>(query_info.prewhere_info->alias_actions, actions_settings));
});
}
2021-02-15 19:48:06 +00:00
if (query_info.prewhere_info->row_level_filter)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
2021-02-15 19:48:06 +00:00
return std::make_shared<FilterTransform>(
header,
2021-06-25 14:49:28 +00:00
std::make_shared<ExpressionActions>(query_info.prewhere_info->row_level_filter, actions_settings),
2021-02-15 19:48:06 +00:00
query_info.prewhere_info->row_level_column_name,
false);
});
}
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
2021-06-25 14:49:28 +00:00
std::make_shared<ExpressionActions>(query_info.prewhere_info->prewhere_actions, actions_settings),
query_info.prewhere_info->prewhere_column_name,
query_info.prewhere_info->remove_prewhere_column);
});
}
auto read_from_buffers = std::make_unique<ReadFromPreparedSource>(std::move(pipe_from_buffers));
read_from_buffers->setStepDescription("Read from buffers of Buffer table");
buffers_plan.addStep(std::move(read_from_buffers));
}
if (!query_plan.isInitialized())
{
query_plan = std::move(buffers_plan);
return;
}
auto result_header = buffers_plan.getCurrentDataStream().header;
/// Convert structure from table to structure from buffer.
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
2020-11-17 17:16:55 +00:00
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), convert_actions_dag);
query_plan.addStep(std::move(converting));
}
DataStreams input_streams;
input_streams.emplace_back(query_plan.getCurrentDataStream());
input_streams.emplace_back(buffers_plan.getCurrentDataStream());
std::vector<std::unique_ptr<QueryPlan>> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
plans.emplace_back(std::make_unique<QueryPlan>(std::move(buffers_plan)));
query_plan = QueryPlan();
2021-03-25 09:57:14 +00:00
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
union_step->setStepDescription("Unite sources from Buffer table");
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
static void appendBlock(const Block & from, Block & to)
{
size_t rows = from.rows();
size_t old_rows = to.rows();
size_t old_bytes = to.bytes();
if (!to)
to = from.cloneEmpty();
assertBlocksHaveEqualStructure(from, to, "Buffer");
from.checkNumberOfRows();
to.checkNumberOfRows();
Fix NULL dereference in Buffer rollback <details> Stacktrace: ``` (gdb) bt 0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411 1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541 2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508 3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30 6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50 7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820 10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469 11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17 14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17 15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168 16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918 17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460 18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490 19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519 20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268 21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414 22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112 24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345 26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 (gdb) p to.data.__end_-to.data.__begin_ $17 = 10 (gdb) p to.data.__begin_[9].column.px $19 = (const DB::IColumn *) 0x7f7328392720 (gdb) p to.data.__begin_[8].column.px $20 = (const DB::IColumn *) 0x0 (gdb) p to.data.__begin_[7].column.px $21 = (const DB::IColumn *) 0x7f746f33d360 ``` Line numbers matched with this version - https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411 </details>
2020-12-21 20:48:18 +00:00
MutableColumnPtr last_col;
try
{
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
2021-10-22 16:25:48 +00:00
{
const IColumn & col_from = *from.getByPosition(column_no).column.get();
last_col = IColumn::mutate(std::move(to.getByPosition(column_no).column));
/// In case of ColumnAggregateFunction aggregate states will
/// be allocated from the query context but can be destroyed from the
/// server context (in case of background flush), and thus memory
/// will be leaked from the query, but only tracked memory, not
/// memory itself.
///
/// To avoid this, prohibit sharing the aggregate states.
last_col->ensureOwnership();
last_col->insertRangeFrom(col_from, 0, rows);
to.getByPosition(column_no).column = std::move(last_col);
}
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, to.bytes() - old_bytes);
}
catch (...)
{
/// Rollback changes.
/// In case of rollback, it is better to ignore memory limits instead of abnormal server termination.
/// So ignore any memory limits, even global (since memory tracking has drift).
MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits(VariableContext::Global);
try
{
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
ColumnPtr & col_to = to.getByPosition(column_no).column;
Fix NULL dereference in Buffer rollback <details> Stacktrace: ``` (gdb) bt 0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411 1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541 2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508 3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30 6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50 7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820 10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469 11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17 14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17 15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168 16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918 17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460 18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490 19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519 20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268 21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414 22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112 24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345 26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 (gdb) p to.data.__end_-to.data.__begin_ $17 = 10 (gdb) p to.data.__begin_[9].column.px $19 = (const DB::IColumn *) 0x7f7328392720 (gdb) p to.data.__begin_[8].column.px $20 = (const DB::IColumn *) 0x0 (gdb) p to.data.__begin_[7].column.px $21 = (const DB::IColumn *) 0x7f746f33d360 ``` Line numbers matched with this version - https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411 </details>
2020-12-21 20:48:18 +00:00
/// If there is no column, then the exception was thrown in the middle of append, in the insertRangeFrom()
if (!col_to)
{
Fix NULL dereference in Buffer rollback <details> Stacktrace: ``` (gdb) bt 0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411 1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541 2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508 3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30 6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50 7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820 10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469 11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17 14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17 15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168 16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918 17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460 18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490 19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519 20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268 21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414 22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112 24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345 26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 (gdb) p to.data.__end_-to.data.__begin_ $17 = 10 (gdb) p to.data.__begin_[9].column.px $19 = (const DB::IColumn *) 0x7f7328392720 (gdb) p to.data.__begin_[8].column.px $20 = (const DB::IColumn *) 0x0 (gdb) p to.data.__begin_[7].column.px $21 = (const DB::IColumn *) 0x7f746f33d360 ``` Line numbers matched with this version - https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411 </details>
2020-12-21 20:48:18 +00:00
col_to = std::move(last_col);
/// Suppress clang-tidy [bugprone-use-after-move]
last_col = {};
}
Fix NULL dereference in Buffer rollback <details> Stacktrace: ``` (gdb) bt 0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411 1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541 2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508 3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30 6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50 7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280 9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820 10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469 11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160 12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10 13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17 14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17 15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168 16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918 17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460 18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490 19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519 20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268 21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414 22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43 23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112 24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199 25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345 26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477 27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95 (gdb) p to.data.__end_-to.data.__begin_ $17 = 10 (gdb) p to.data.__begin_[9].column.px $19 = (const DB::IColumn *) 0x7f7328392720 (gdb) p to.data.__begin_[8].column.px $20 = (const DB::IColumn *) 0x0 (gdb) p to.data.__begin_[7].column.px $21 = (const DB::IColumn *) 0x7f746f33d360 ``` Line numbers matched with this version - https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411 </details>
2020-12-21 20:48:18 +00:00
/// But if there is still nothing, abort
if (!col_to)
throw Exception("No column to rollback", ErrorCodes::LOGICAL_ERROR);
if (col_to->size() != old_rows)
2020-05-14 08:30:18 +00:00
col_to = col_to->cut(0, old_rows);
}
}
catch (...)
{
/// In case when we cannot rollback, do not leave incorrect state in memory.
std::terminate();
}
throw;
}
}
2021-07-23 14:25:35 +00:00
class BufferSink : public SinkToStorage
{
public:
2021-07-23 14:25:35 +00:00
explicit BufferSink(
StorageBuffer & storage_,
const StorageMetadataPtr & metadata_snapshot_)
2021-07-23 14:25:35 +00:00
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{
// Check table structure.
metadata_snapshot->check(getHeader(), true);
2021-07-23 14:25:35 +00:00
}
2021-07-23 19:33:59 +00:00
String getName() const override { return "BufferSink"; }
2021-07-23 14:25:35 +00:00
void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
if (!rows)
return;
auto block = getHeader().cloneWithColumns(chunk.getColumns());
2021-07-23 14:25:35 +00:00
StoragePtr destination;
2020-02-17 19:28:25 +00:00
if (storage.destination_id)
{
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.getContext());
if (destination.get() == &storage)
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
}
size_t bytes = block.bytes();
storage.lifetime_writes.rows += rows;
storage.lifetime_writes.bytes += bytes;
/// If the block already exceeds the maximum limit, then we skip the buffer.
if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
{
2020-02-17 19:28:25 +00:00
if (storage.destination_id)
{
LOG_DEBUG(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
storage.writeBlockToDestination(block, destination);
}
return;
}
/// We distribute the load on the shards by the stream number.
2020-02-02 02:35:47 +00:00
const auto start_shard_num = getThreadId() % storage.num_shards;
/// We loop through the buffers, trying to lock mutex. No more than one lap.
auto shard_num = start_shard_num;
StorageBuffer::Buffer * least_busy_buffer = nullptr;
std::unique_lock<std::mutex> least_busy_lock;
size_t least_busy_shard_rows = 0;
for (size_t try_no = 0; try_no < storage.num_shards; ++try_no)
{
std::unique_lock lock(storage.buffers[shard_num].tryLock());
if (lock.owns_lock())
{
size_t num_rows = storage.buffers[shard_num].data.rows();
if (!least_busy_buffer || num_rows < least_busy_shard_rows)
{
least_busy_buffer = &storage.buffers[shard_num];
least_busy_lock = std::move(lock);
least_busy_shard_rows = num_rows;
}
}
shard_num = (shard_num + 1) % storage.num_shards;
}
/// If you still can not lock anything at once, then we'll wait on mutex.
if (!least_busy_buffer)
2018-08-24 14:51:34 +00:00
{
least_busy_buffer = &storage.buffers[start_shard_num];
least_busy_lock = least_busy_buffer->lockForWriting();
2018-08-24 14:51:34 +00:00
}
insertIntoBuffer(block, *least_busy_buffer);
least_busy_lock.unlock();
storage.reschedule();
}
private:
StorageBuffer & storage;
StorageMetadataPtr metadata_snapshot;
2018-08-24 14:51:34 +00:00
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
{
time_t current_time = time(nullptr);
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
Block sorted_block = block.sortColumns();
if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
* an exception will be thrown, and new data will not be added to the buffer.
*/
storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
}
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
size_t old_rows = buffer.data.rows();
size_t old_bytes = buffer.data.allocatedBytes();
appendBlock(sorted_block, buffer.data);
storage.total_writes.rows += (buffer.data.rows() - old_rows);
storage.total_writes.bytes += (buffer.data.allocatedBytes() - old_bytes);
}
};
2021-07-23 14:25:35 +00:00
SinkToStoragePtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
2021-07-23 19:33:59 +00:00
return std::make_shared<BufferSink>(*this, metadata_snapshot);
}
bool StorageBuffer::mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & /*metadata_snapshot*/) const
2018-03-16 09:00:04 +00:00
{
2020-02-17 19:28:25 +00:00
if (!destination_id)
2018-03-16 09:00:04 +00:00
return false;
2020-05-28 23:01:18 +00:00
auto destination = DatabaseCatalog::instance().getTable(destination_id, query_context);
2018-03-16 09:00:04 +00:00
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->mayBenefitFromIndexForIn(left_in_operand, query_context, destination->getInMemoryMetadataPtr());
2018-03-16 09:00:04 +00:00
}
void StorageBuffer::startup()
{
if (getContext()->getSettingsRef().readonly)
{
LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate buffer_profile to fix this.", getName());
}
flush_handle->activateAndSchedule();
}
void StorageBuffer::flush()
{
if (!flush_handle)
return;
flush_handle->deactivate();
try
{
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, getContext());
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/** NOTE If you do OPTIMIZE after insertion,
* it does not guarantee, that all data will be in destination table at the time of next SELECT just after OPTIMIZE.
*
* Because in case if there was already running flushBuffer method,
* then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly,
* but at the same time, the already running flushBuffer method possibly is not finished,
* so next SELECT will observe missing data.
*
* This kind of race condition make very hard to implement proper tests.
*/
2020-06-17 13:39:26 +00:00
bool StorageBuffer::optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
ContextPtr /*context*/)
{
if (partition)
throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
if (final)
throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
if (deduplicate)
throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
flushAllBuffers(false);
return true;
}
bool StorageBuffer::supportsPrewhere() const
{
if (!destination_id)
return false;
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, getContext());
if (dest && dest.get() != this)
return dest->supportsPrewhere();
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
time_passed = current_time - buffer.first_write_time;
size_t rows = buffer.data.rows() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
return checkThresholdsImpl(direct, rows, bytes, time_passed);
2015-12-09 06:55:49 +00:00
}
bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const
2015-12-09 06:55:49 +00:00
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedAllMinThresholds);
return true;
}
if (time_passed > max_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeMaxThreshold);
return true;
}
if (rows > max_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsMaxThreshold);
return true;
}
if (bytes > max_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesMaxThreshold);
return true;
}
if (!direct)
{
if (flush_thresholds.time && time_passed > flush_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeFlushThreshold);
return true;
}
if (flush_thresholds.rows && rows > flush_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsFlushThreshold);
return true;
}
if (flush_thresholds.bytes && bytes > flush_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesFlushThreshold);
return true;
}
}
return false;
}
void StorageBuffer::flushAllBuffers(bool check_thresholds)
{
for (auto & buf : buffers)
flushBuffer(buf, check_thresholds, false);
}
bool StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
{
Block block_to_write;
time_t current_time = time(nullptr);
std::optional<std::unique_lock<std::mutex>> lock;
2018-08-24 14:51:34 +00:00
if (!locked)
lock.emplace(buffer.lockForReading());
time_t time_passed = 0;
size_t rows = buffer.data.rows();
size_t bytes = buffer.data.bytes();
if (buffer.first_write_time)
time_passed = current_time - buffer.first_write_time;
if (check_thresholds)
{
if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed))
return false;
}
buffer.data.swap(block_to_write);
buffer.first_write_time = 0;
size_t block_rows = block_to_write.rows();
size_t block_bytes = block_to_write.bytes();
2021-05-24 13:55:05 +00:00
size_t block_allocated_bytes_delta = block_to_write.allocatedBytes() - buffer.data.allocatedBytes();
CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_rows);
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_bytes);
ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
2020-02-17 19:28:25 +00:00
if (!destination_id)
{
total_writes.rows -= block_rows;
2021-05-24 13:55:05 +00:00
total_writes.bytes -= block_allocated_bytes_delta;
LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
return true;
}
/** For simplicity, buffer is locked during write.
2017-11-15 19:47:49 +00:00
* We could unlock buffer temporary, but it would lead to too many difficulties:
* - data, that is written, will not be visible for SELECTs;
* - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written;
* - this could lead to infinite memory growth.
*/
Stopwatch watch;
try
{
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, getContext()));
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
/// Return the block to its place in the buffer.
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
buffer.data.swap(block_to_write);
2021-05-08 14:43:03 +00:00
if (!buffer.first_write_time) // -V547
buffer.first_write_time = current_time;
/// After a while, the next write attempt will happen.
throw;
}
total_writes.rows -= block_rows;
2021-05-24 13:55:05 +00:00
total_writes.bytes -= block_allocated_bytes_delta;
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
return true;
}
void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{
2020-02-17 19:28:25 +00:00
if (!destination_id || !block)
return;
if (!table)
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(log, "Destination table {} doesn't exist. Block of data is discarded.", destination_id.getNameForLogs());
return;
}
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto insert = std::make_shared<ASTInsertQuery>();
2020-03-02 20:23:58 +00:00
insert->table_id = destination_id;
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases (but not all) when the table structure does not match.
*/
Block structure_of_destination_table = allow_materialized ? destination_metadata_snapshot->getSampleBlock()
: destination_metadata_snapshot->getSampleBlockNonMaterialized();
Block block_to_write;
2021-06-15 19:55:21 +00:00
for (size_t i : collections::range(0, structure_of_destination_table.columns()))
{
auto dst_col = structure_of_destination_table.getByPosition(i);
if (block.has(dst_col.name))
{
auto column = block.getByName(dst_col.name);
if (!column.type->equals(*dst_col.type))
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Destination table {} have different type of column {} ({} != {}). Block of data is converted.", destination_id.getNameForLogs(), backQuoteIfNeed(column.name), dst_col.type->getName(), column.type->getName());
2020-04-14 21:05:45 +00:00
column.column = castColumn(column, dst_col.type);
column.type = dst_col.type;
}
block_to_write.insert(column);
}
}
if (block_to_write.columns() == 0)
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(log, "Destination table {} have no common columns with block in buffer. Block of data is discarded.", destination_id.getNameForLogs());
return;
}
if (block_to_write.columns() != block.columns())
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Not all columns from block in buffer exist in destination table {}. Some columns are discarded.", destination_id.getNameForLogs());
auto list_of_columns = std::make_shared<ASTExpressionList>();
insert->columns = list_of_columns;
list_of_columns->children.reserve(block_to_write.columns());
2018-11-19 15:20:34 +00:00
for (const auto & column : block_to_write)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
auto insert_context = Context::createCopy(getContext());
insert_context->makeQueryContext();
2020-07-06 17:24:33 +00:00
InterpreterInsertQuery interpreter{insert, insert_context, allow_materialized};
auto block_io = interpreter.execute();
2021-09-16 17:40:42 +00:00
PushingPipelineExecutor executor(block_io.pipeline);
executor.start();
executor.push(std::move(block_to_write));
executor.finish();
}
void StorageBuffer::backgroundFlush()
{
try
{
flushAllBuffers(true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
reschedule();
}
void StorageBuffer::reschedule()
{
time_t min_first_write_time = std::numeric_limits<time_t>::max();
time_t rows = 0;
for (auto & buffer : buffers)
{
/// try_to_lock here to avoid waiting for other layers flushing to be finished,
/// since the buffer table may:
/// - push to Distributed table, that may take too much time,
/// - push to table with materialized views attached,
/// this is also may take some time.
///
/// try_to_lock is also ok for background flush, since if there is
/// INSERT contended, then the reschedule will be done after
/// INSERT will be done.
std::unique_lock lock(buffer.tryLock());
if (lock.owns_lock())
{
min_first_write_time = buffer.first_write_time;
rows += buffer.data.rows();
}
}
/// will be rescheduled via INSERT
if (!rows)
return;
time_t current_time = time(nullptr);
time_t time_passed = current_time - min_first_write_time;
size_t min = std::max<ssize_t>(min_thresholds.time - time_passed, 1);
size_t max = std::max<ssize_t>(max_thresholds.time - time_passed, 1);
size_t flush = std::max<ssize_t>(flush_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min({min, max, flush}) * 1000);
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
2019-12-26 18:17:05 +00:00
{
auto name_deps = getDependentViewsByColumn(local_context);
2019-12-26 18:17:05 +00:00
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN
&& command.type != AlterCommand::Type::COMMENT_TABLE)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}",
command.type, getName());
if (command.type == AlterCommand::Type::DROP_COLUMN && !command.clear)
{
2021-02-28 07:42:08 +00:00
const auto & deps_mv = name_deps[command.column_name];
if (!deps_mv.empty())
{
throw Exception(
"Trying to ALTER DROP column " + backQuoteIfNeed(command.column_name) + " which is referenced by materialized view "
+ toString(deps_mv),
ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN);
}
}
2019-12-26 18:17:05 +00:00
}
}
2020-11-25 13:47:32 +00:00
std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
{
std::optional<UInt64> underlying_rows;
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, getContext());
if (underlying)
2020-11-25 13:47:32 +00:00
underlying_rows = underlying->totalRows(settings);
if (!underlying_rows)
return underlying_rows;
return total_writes.rows + *underlying_rows;
}
2020-11-25 13:47:32 +00:00
std::optional<UInt64> StorageBuffer::totalBytes(const Settings & /*settings*/) const
{
return total_writes.bytes;
}
2021-10-25 17:49:49 +00:00
void StorageBuffer::alter(const AlterCommands & params, ContextPtr local_context, AlterLockHolder &)
{
auto table_id = getStorageID();
checkAlterIsPossible(params, local_context);
2020-06-17 13:39:26 +00:00
auto metadata_snapshot = getInMemoryMetadataPtr();
2020-09-23 12:06:54 +00:00
/// Flush all buffers to storages, so that no non-empty blocks of the old
/// structure remain. Structure of empty blocks will be updated during first
/// insert.
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, local_context);
2020-06-17 13:39:26 +00:00
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
params.apply(new_metadata, local_context);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
setInMemoryMetadata(new_metadata);
}
void registerStorageBuffer(StorageFactory & factory)
{
/** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
*
* db, table - in which table to put data from buffer.
* num_buckets - level of parallelism.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer,
* flush_time, flush_rows, flush_bytes - conditions for flushing.
*/
factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 9 || engine_args.size() > 12)
throw Exception("Storage Buffer requires from 9 to 12 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
2020-07-13 13:58:15 +00:00
// Table and database name arguments accept expressions, evaluate them.
engine_args[0] = evaluateConstantExpressionForDatabaseName(engine_args[0], args.getLocalContext());
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
2020-07-13 13:58:15 +00:00
// After we evaluated all expressions, check that all arguments are
// literals.
2021-12-20 12:55:07 +00:00
for (size_t i = 0; i < engine_args.size(); ++i)
2020-07-13 13:58:15 +00:00
{
if (!typeid_cast<ASTLiteral *>(engine_args[i].get()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
2020-07-14 12:40:18 +00:00
"Storage Buffer expects a literal as an argument #{}, got '{}'"
2020-07-13 13:58:15 +00:00
" instead", i, engine_args[i]->formatForErrorMessage());
}
}
size_t i = 0;
String destination_database = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
StorageBuffer::Thresholds min;
StorageBuffer::Thresholds max;
StorageBuffer::Thresholds flush;
min.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
2020-02-17 19:28:25 +00:00
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
StorageID destination_id = StorageID::createEmpty();
if (!destination_table.empty())
{
destination_id.database_name = args.getContext()->resolveDatabase(destination_database);
2020-02-17 19:28:25 +00:00
destination_id.table_name = destination_table;
}
return StorageBuffer::create(
2019-12-04 16:06:55 +00:00
args.table_id,
args.columns,
args.constraints,
2021-04-23 12:18:23 +00:00
args.comment,
args.getContext(),
num_buckets,
2021-04-23 12:18:23 +00:00
min,
max,
flush,
2020-02-17 19:28:25 +00:00
destination_id,
static_cast<bool>(args.getLocalContext()->getSettingsRef().insert_allow_materialized_columns));
},
{
.supports_parallel_insert = true,
.supports_schema_inference = true,
});
}
}