mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 09:40:49 +00:00
Merge
This commit is contained in:
parent
d77c0747ee
commit
7dca2f21b9
@ -45,6 +45,8 @@ struct Settings
|
|||||||
bool extremes;
|
bool extremes;
|
||||||
/// Использовать ли кэш разжатых блоков.
|
/// Использовать ли кэш разжатых блоков.
|
||||||
bool use_uncompressed_cache;
|
bool use_uncompressed_cache;
|
||||||
|
/// Минимальное количество записанных строк, после которого следует делать fsync или sync. 0 - не делать вообще.
|
||||||
|
size_t min_rows_to_sync;
|
||||||
|
|
||||||
/// Всевозможные ограничения на выполнение запроса.
|
/// Всевозможные ограничения на выполнение запроса.
|
||||||
Limits limits;
|
Limits limits;
|
||||||
@ -64,7 +66,8 @@ struct Settings
|
|||||||
poll_interval(DBMS_DEFAULT_POLL_INTERVAL),
|
poll_interval(DBMS_DEFAULT_POLL_INTERVAL),
|
||||||
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
|
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
|
||||||
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES),
|
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES),
|
||||||
sign_rewrite(false), extremes(false), use_uncompressed_cache(true)
|
sign_rewrite(false), extremes(false), use_uncompressed_cache(true),
|
||||||
|
min_rows_to_sync(1000000)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ private:
|
|||||||
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
|
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
|
||||||
(*it)->type->serializeBinary((*(*it)->column)[i], index);
|
(*it)->type->serializeBinary((*(*it)->column)[i], index);
|
||||||
|
|
||||||
index.sync();
|
index.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(storage.log, "Writing data.");
|
LOG_TRACE(storage.log, "Writing data.");
|
||||||
@ -157,6 +157,14 @@ private:
|
|||||||
const ColumnWithNameAndType & column = block.getByPosition(i);
|
const ColumnWithNameAndType & column = block.getByPosition(i);
|
||||||
writeData(part_tmp_path, column.name, *column.type, *column.column, offset_columns);
|
writeData(part_tmp_path, column.name, *column.type, *column.column, offset_columns);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Если надо - попросим ОС сбросить данные на диск.
|
||||||
|
size_t min_rows_to_sync = storage.context.getSettings().min_rows_to_sync;
|
||||||
|
if (min_rows_to_sync && rows >= min_rows_to_sync)
|
||||||
|
{
|
||||||
|
LOG_TRACE(storage.log, "sync()");
|
||||||
|
::sync(); /// Если вызывать fsync для каждого файла по отдельности, то всё больше тормозит.
|
||||||
|
}
|
||||||
|
|
||||||
LOG_TRACE(storage.log, "Renaming.");
|
LOG_TRACE(storage.log, "Renaming.");
|
||||||
|
|
||||||
@ -220,8 +228,8 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
compressed.next();
|
compressed.next();
|
||||||
plain.sync();
|
plain.next();
|
||||||
marks.sync();
|
marks.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
|
if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
|
||||||
@ -244,8 +252,8 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
compressed.next();
|
compressed.next();
|
||||||
plain.sync();
|
plain.next();
|
||||||
marks.sync();
|
marks.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -264,8 +272,8 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
compressed.next();
|
compressed.next();
|
||||||
plain.sync();
|
plain.next();
|
||||||
marks.sync();
|
marks.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -16,8 +16,8 @@ class MergedBlockOutputStream : public IBlockOutputStream
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
MergedBlockOutputStream(StorageMergeTree & storage_,
|
MergedBlockOutputStream(StorageMergeTree & storage_,
|
||||||
UInt16 min_date, UInt16 max_date, UInt64 min_part_id, UInt64 max_part_id, UInt32 level)
|
UInt16 min_date, UInt16 max_date, UInt64 min_part_id, UInt64 max_part_id, UInt32 level)
|
||||||
: storage(storage_), marks_count(0), index_offset(0)
|
: storage(storage_), marks_count(0), index_offset(0), total_rows(0)
|
||||||
{
|
{
|
||||||
part_name = storage.getPartName(
|
part_name = storage.getPartName(
|
||||||
DayNum_t(min_date), DayNum_t(max_date),
|
DayNum_t(min_date), DayNum_t(max_date),
|
||||||
@ -37,6 +37,7 @@ public:
|
|||||||
void write(const Block & block)
|
void write(const Block & block)
|
||||||
{
|
{
|
||||||
size_t rows = block.rows();
|
size_t rows = block.rows();
|
||||||
|
total_rows += rows;
|
||||||
|
|
||||||
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
|
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
|
||||||
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
|
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
|
||||||
@ -75,16 +76,24 @@ public:
|
|||||||
void writeSuffix()
|
void writeSuffix()
|
||||||
{
|
{
|
||||||
/// Заканчиваем запись.
|
/// Заканчиваем запись.
|
||||||
index_stream->sync();
|
index_stream->next();
|
||||||
index_stream = NULL;
|
index_stream = NULL;
|
||||||
|
|
||||||
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
|
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
|
||||||
it->second->sync();
|
it->second->finalize();
|
||||||
|
|
||||||
column_streams.clear();
|
column_streams.clear();
|
||||||
|
|
||||||
if (marks_count == 0)
|
if (marks_count == 0)
|
||||||
throw Exception("Empty part", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Empty part", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
/// Если надо - попросим ОС сбросить данные на диск.
|
||||||
|
size_t min_rows_to_sync = storage.context.getSettings().min_rows_to_sync;
|
||||||
|
if (min_rows_to_sync && total_rows >= min_rows_to_sync)
|
||||||
|
{
|
||||||
|
LOG_TRACE(storage.log, "sync()");
|
||||||
|
::sync(); /// Если вызывать fsync для каждого файла по отдельности, то всё больше тормозит.
|
||||||
|
}
|
||||||
|
|
||||||
/// Переименовываем кусок.
|
/// Переименовываем кусок.
|
||||||
Poco::File(part_tmp_path).renameTo(part_res_path);
|
Poco::File(part_tmp_path).renameTo(part_res_path);
|
||||||
@ -116,11 +125,11 @@ private:
|
|||||||
CompressedWriteBuffer compressed;
|
CompressedWriteBuffer compressed;
|
||||||
WriteBufferFromFile marks;
|
WriteBufferFromFile marks;
|
||||||
|
|
||||||
void sync()
|
void finalize()
|
||||||
{
|
{
|
||||||
compressed.next();
|
compressed.next();
|
||||||
plain.sync();
|
plain.next();
|
||||||
marks.sync();
|
marks.next();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -131,6 +140,9 @@ private:
|
|||||||
|
|
||||||
/// Смещение до первой строчки блока, для которой надо записать индекс.
|
/// Смещение до первой строчки блока, для которой надо записать индекс.
|
||||||
size_t index_offset;
|
size_t index_offset;
|
||||||
|
|
||||||
|
/// Общее количество записанных строк.
|
||||||
|
size_t total_rows;
|
||||||
|
|
||||||
typedef std::set<std::string> OffsetColumns;
|
typedef std::set<std::string> OffsetColumns;
|
||||||
|
|
||||||
@ -268,4 +280,4 @@ private:
|
|||||||
|
|
||||||
typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;
|
typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -107,10 +107,10 @@ private:
|
|||||||
|
|
||||||
size_t plain_offset; /// Сколько байт было в файле на момент создания LogBlockOutputStream.
|
size_t plain_offset; /// Сколько байт было в файле на момент создания LogBlockOutputStream.
|
||||||
|
|
||||||
void sync()
|
void finalize()
|
||||||
{
|
{
|
||||||
compressed.next();
|
compressed.next();
|
||||||
plain.sync();
|
plain.next();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -85,10 +85,10 @@ private:
|
|||||||
WriteBufferFromFile plain;
|
WriteBufferFromFile plain;
|
||||||
CompressedWriteBuffer compressed;
|
CompressedWriteBuffer compressed;
|
||||||
|
|
||||||
void sync()
|
void finalize()
|
||||||
{
|
{
|
||||||
compressed.next();
|
compressed.next();
|
||||||
plain.sync();
|
plain.next();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ void Settings::set(const String & name, const Field & value)
|
|||||||
else if (name == "sign_rewrite") sign_rewrite = safeGet<UInt64>(value);
|
else if (name == "sign_rewrite") sign_rewrite = safeGet<UInt64>(value);
|
||||||
else if (name == "extremes") extremes = safeGet<UInt64>(value);
|
else if (name == "extremes") extremes = safeGet<UInt64>(value);
|
||||||
else if (name == "use_uncompressed_cache") use_uncompressed_cache = safeGet<UInt64>(value);
|
else if (name == "use_uncompressed_cache") use_uncompressed_cache = safeGet<UInt64>(value);
|
||||||
|
else if (name == "min_rows_to_sync") min_rows_to_sync = safeGet<UInt64>(value);
|
||||||
else if (name == "profile") setProfile(get<const String &>(value));
|
else if (name == "profile") setProfile(get<const String &>(value));
|
||||||
else if (!limits.trySet(name, value))
|
else if (!limits.trySet(name, value))
|
||||||
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
|
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
|
||||||
@ -55,7 +56,8 @@ void Settings::set(const String & name, ReadBuffer & buf)
|
|||||||
|| name == "connections_with_failover_max_tries"
|
|| name == "connections_with_failover_max_tries"
|
||||||
|| name == "sign_rewrite"
|
|| name == "sign_rewrite"
|
||||||
|| name == "extremes"
|
|| name == "extremes"
|
||||||
|| name == "use_uncompressed_cache")
|
|| name == "use_uncompressed_cache"
|
||||||
|
|| name == "min_rows_to_sync")
|
||||||
{
|
{
|
||||||
UInt64 value = 0;
|
UInt64 value = 0;
|
||||||
readVarUInt(value, buf);
|
readVarUInt(value, buf);
|
||||||
@ -89,7 +91,8 @@ void Settings::set(const String & name, const String & value)
|
|||||||
|| name == "connections_with_failover_max_tries"
|
|| name == "connections_with_failover_max_tries"
|
||||||
|| name == "sign_rewrite"
|
|| name == "sign_rewrite"
|
||||||
|| name == "extremes"
|
|| name == "extremes"
|
||||||
|| name == "use_uncompressed_cache")
|
|| name == "use_uncompressed_cache"
|
||||||
|
|| name == "min_rows_to_sync")
|
||||||
{
|
{
|
||||||
set(name, parse<UInt64>(value));
|
set(name, parse<UInt64>(value));
|
||||||
}
|
}
|
||||||
@ -150,6 +153,7 @@ void Settings::serialize(WriteBuffer & buf) const
|
|||||||
writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf);
|
writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf);
|
||||||
writeStringBinary("extremes", buf); writeVarUInt(extremes, buf);
|
writeStringBinary("extremes", buf); writeVarUInt(extremes, buf);
|
||||||
writeStringBinary("use_uncompressed_cache", buf); writeVarUInt(use_uncompressed_cache, buf);
|
writeStringBinary("use_uncompressed_cache", buf); writeVarUInt(use_uncompressed_cache, buf);
|
||||||
|
writeStringBinary("min_rows_to_sync", buf); writeVarUInt(min_rows_to_sync, buf);
|
||||||
|
|
||||||
limits.serialize(buf);
|
limits.serialize(buf);
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ void StorageChunks::appendChunkToIndex(const std::string & chunk_name, size_t ma
|
|||||||
WriteBufferFromFile index(index_path, 4096, O_APPEND | O_CREAT | O_WRONLY);
|
WriteBufferFromFile index(index_path, 4096, O_APPEND | O_CREAT | O_WRONLY);
|
||||||
writeStringBinary(chunk_name, index);
|
writeStringBinary(chunk_name, index);
|
||||||
writeIntBinary<UInt64>(mark, index);
|
writeIntBinary<UInt64>(mark, index);
|
||||||
index.sync();
|
index.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageChunks::dropThis()
|
void StorageChunks::dropThis()
|
||||||
|
@ -219,10 +219,10 @@ void LogBlockOutputStream::write(const Block & block)
|
|||||||
void LogBlockOutputStream::writeSuffix()
|
void LogBlockOutputStream::writeSuffix()
|
||||||
{
|
{
|
||||||
/// Заканчиваем запись.
|
/// Заканчиваем запись.
|
||||||
marks_stream.sync();
|
marks_stream.next();
|
||||||
|
|
||||||
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||||
it->second->sync();
|
it->second->finalize();
|
||||||
|
|
||||||
streams.clear();
|
streams.clear();
|
||||||
}
|
}
|
||||||
|
@ -249,7 +249,7 @@ void TinyLogBlockOutputStream::writeSuffix()
|
|||||||
{
|
{
|
||||||
/// Заканчиваем запись.
|
/// Заканчиваем запись.
|
||||||
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||||
it->second->sync();
|
it->second->finalize();
|
||||||
|
|
||||||
streams.clear();
|
streams.clear();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user