fix broken by refactoring functionality with wide parts

This commit is contained in:
CurtizJ 2019-12-19 17:05:26 +03:00
parent ba2a630a13
commit 206cb1afa9
25 changed files with 31 additions and 142 deletions

View File

@ -90,7 +90,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
void fillIndexGranularityImpl(
static void fillIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
@ -98,7 +98,7 @@ void fillIndexGranularityImpl(
size_t index_offset,
MergeTreeIndexGranularity & index_granularity,
bool can_use_adaptive_index_granularity,
bool need_finish_last_granule)
bool need_finish_last_granule = false)
{
/// FIXME correct index granularity for compact
size_t rows_in_block = block.rows();
@ -151,9 +151,6 @@ void fillIndexGranularityImpl(
else
index_granularity.appendMark(index_granularity_for_block);
}
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
std::cerr << "marks: " << index_granularity.getMarkRows(i) << "\n";
}
void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block)
@ -226,7 +223,6 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p
/// Write index. The index contains Primary Key value for each `index_granularity` row.
std::cerr << "writing index...\n";
for (size_t i = index_offset; i < rows;)
{
if (storage.hasPrimaryKey())
@ -239,10 +235,6 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p
}
}
std::cerr << "(index) i: " << i << "\n";
std::cerr << "(index) current_mark: " << current_mark << "\n";
std::cerr << "(index) rows in mark: " << index_granularity.getMarkRows(current_mark) << "\n";
i += index_granularity.getMarkRows(current_mark++);
if (current_mark >= index_granularity.getMarksCount())
break;
@ -322,8 +314,6 @@ void IMergeTreeDataPartWriter::calculateAndSerializeSkipIndices(
void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums)
{
std::cerr << "finishPrimaryIndexSerialization called...\n";
bool write_final_mark = (with_final_mark && data_written);
if (write_final_mark && compute_granularity)
index_granularity.appendMark(0);
@ -341,9 +331,6 @@ void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::Da
last_index_row.clear();
}
std::cerr << "(finishPrimaryIndexSerialization) marks_count: " << index_granularity.getMarksCount() << "\n";
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();

View File

@ -94,12 +94,14 @@ public:
const MergeTreeData::ColumnSizeByName & getColumnsSizes() const { return columns_sizes; }
void setOffsetColumns(WrittenOffsetColumns * written_offset_columns_, bool skip_offsets_)
void setWrittenOffsetColumns(WrittenOffsetColumns * written_offset_columns_)
{
written_offset_columns = written_offset_columns_;
skip_offsets = skip_offsets_;
}
using SkipIndices = std::vector<MergeTreeIndexPtr>;
const SkipIndices & getSkipIndices() { return skip_indices; }
void initSkipIndices();
void initPrimaryIndex();
@ -160,8 +162,6 @@ protected:
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns * written_offset_columns = nullptr;
bool skip_offsets = false;
};
}

View File

@ -155,8 +155,6 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
UInt64 recommended_rows = estimateNumRows(*task, task->range_reader);
UInt64 rows_to_read = std::max(UInt64(1), std::min(current_max_block_size_rows, recommended_rows));
// std::cerr << "(readFromPartImpl) rows_to_read: " << rows_to_read << "\n";
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
/// All rows were filtered. Repeat.

View File

@ -296,16 +296,13 @@ void MergeTreeData::setProperties(
Names new_primary_key_columns;
Names new_sorting_key_columns;
NameSet sorting_key_columns_set;
NameSet primary_key_columns_set;
for (size_t i = 0; i < sorting_key_size; ++i)
{
String sorting_key_column = new_sorting_key_expr_list->children[i]->getColumnName();
new_sorting_key_columns.push_back(sorting_key_column);
if (!sorting_key_columns_set.emplace(sorting_key_column).second)
throw Exception("Sorting key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
if (i < primary_key_size)
{
String pk_column = new_primary_key_expr_list->children[i]->getColumnName();
@ -314,6 +311,9 @@ void MergeTreeData::setProperties(
+ toString(i) + " its column is " + pk_column + ", not " + sorting_key_column,
ErrorCodes::BAD_ARGUMENTS);
if (!primary_key_columns_set.emplace(pk_column).second)
throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
new_primary_key_columns.push_back(pk_column);
}
}
@ -840,8 +840,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
const auto & part_name = part_names_with_disks[i].first;
const auto part_disk_ptr = part_names_with_disks[i].second;
std::cerr << "(loadDataParts) loading part: " << part_name << "\n";
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return;
@ -1678,7 +1676,6 @@ void MergeTreeData::alterDataPart(
/// Apply the expression and write the result to temporary files.
if (res.expression)
{
std::cerr << "(alterDataPart) expression: " << res.expression->dumpActions() << "\n";
BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
*this, part, res.expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
@ -1687,8 +1684,6 @@ void MergeTreeData::alterDataPart(
static_cast<double>(part->bytes_on_disk) / this->getTotalActiveSizeInBytes());
ExpressionBlockInputStream in(part_in, res.expression);
std::cerr << "im.header: " << in.getHeader().dumpStructure() << "\n";
/** Don't write offsets for arrays, because ALTER never change them
* (MODIFY COLUMN could only change types of elements but never modify array sizes).
* Also note that they does not participate in 'rename_map'.
@ -1696,7 +1691,6 @@ void MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
part,

View File

@ -668,7 +668,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
}
MergeStageProgress horizontal_stage_progress(
column_sizes ? 1.0 : column_sizes->keyColumnsWeight());
column_sizes ? column_sizes->keyColumnsWeight() : 1.0);
for (const auto & part : parts)
{
@ -963,8 +963,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (isCompactPart(source_part))
commands_for_part.additional_columns = source_part->columns.getNames();
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
{
LOG_TRACE(log, "Part " << source_part->name << " doesn't change up to mutation version " << future_part.part_info.mutation);
@ -973,6 +971,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
auto in = mutations_interpreter.execute(table_lock_holder);
const auto & updated_header = mutations_interpreter.getUpdatedHeader();

View File

@ -185,10 +185,6 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
buffer.seek(columns.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
}
std::cerr << "(loadIndexGranularity) marks: " << index_granularity.getMarksCount() << "\n";
std::cerr << "(loadIndexGranularity) mark size: " << index_granularity_info.mark_size_in_bytes << "\n";
std::cerr << "(loadIndexGranularity) marks file size: " << marks_file_size << "\n";
if (index_granularity.getMarksCount() * index_granularity_info.mark_size_in_bytes != marks_file_size)
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);

View File

@ -181,7 +181,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
/// We can use any column, it doesn't matter
std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + escapeForFileName(columns.front().name));
std::string marks_file_path = index_granularity_info.getMarksFilePath(full_path + getFileNameForColumn(columns.front()));
if (!Poco::File(marks_file_path).exists())
throw Exception("Marks file '" + marks_file_path + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
@ -431,7 +431,8 @@ NameToNameMap MergeTreeDataPartWide::createRenameMapForAlter(
String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
{
String filename;
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path) {
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column.name, substream_path);
});

View File

@ -33,7 +33,7 @@ public:
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPartWide(
MergeTreeDataPartWide(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
@ -86,7 +86,7 @@ public:
~MergeTreeDataPartWide() override;
bool hasColumnFiles(const String & column, const IDataType & type) const override;
bool hasColumnFiles(const String & column, const IDataType & type) const override;
protected:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -40,8 +40,6 @@ void MergeTreeDataPartWriterCompact::write(
const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block)
{
std::cerr << "(MergeTreeDataPartWriterCompact::write) block111: " << block.dumpStructure() << "\n";
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
@ -117,10 +115,6 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows)
{
std::cerr << "(writeColumnSingleGranule) writing column: " << column.name << "\n";
std::cerr << "(writeColumnSingleGranule) from_row: " << from_row << "\n";
std::cerr << "(writeColumnSingleGranule) number_of_rows: " << number_of_rows << "\n";
size_t old_uncompressed_size = stream->compressed.count();
writeIntBinary(stream->plain_hashing.count(), stream->marks);

View File

@ -20,7 +20,6 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
: IMergeTreeDataPartWriter(part_path_,
storage_, columns_list_, indices_to_recalc_,
marks_file_extension_, default_codec_, settings_, index_granularity_, false)
, can_use_adaptive_granularity(storage_.canUseAdaptiveGranularity())
{
const auto & columns = storage.getColumns();
for (const auto & it : columns_list)
@ -35,11 +34,10 @@ void MergeTreeDataPartWriterWide::addStreams(
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{
if (skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
if (settings.skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
if (column_streams.count(stream_name))
return;
@ -54,8 +52,6 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.aio_threshold);
};
std::cerr << "(addStreams) name: " << name << "\n";
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
@ -67,7 +63,7 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
if (is_offsets && settings.skip_offsets)
return nullptr;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -104,9 +100,6 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
if (compute_granularity)
fillIndexGranularity(block);
std::cerr << "(MergeTreeDataPartWriterWide::write) marks_count: " << index_granularity.getMarksCount() << "\n";
std::cerr << "(MergeTreeDataPartWriterWide::write) current_mark: " << current_mark << "\n";
auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};
auto it = columns_list.begin();
@ -150,7 +143,7 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
if (is_offsets && settings.skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -192,7 +185,7 @@ size_t MergeTreeDataPartWriterWide::writeSingleGranule(
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets && skip_offsets)
if (is_offsets && settings.skip_offsets)
return;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
@ -222,10 +215,6 @@ void MergeTreeDataPartWriterWide::writeColumn(
type.serializeBinaryBulkStatePrefix(serialize_settings, it->second);
}
std::cerr << "(writeColumn) table: " << storage.getTableName() << "\n";
std::cerr << "(writeColumn) column: " << name << "\n";
std::cerr << "(writeColumn) index_offset: " << index_offset << "\n";
const auto & global_settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns);

View File

@ -69,9 +69,7 @@ private:
size_t estimated_size);
SerializationStates serialization_states;
bool can_use_adaptive_granularity;
ColumnStreams column_streams;
};
}

View File

@ -718,12 +718,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const
{
std::cerr << "marks to read: ";
for (const auto & part : parts)
for (auto range : part.ranges)
std::cerr << "(" << range.begin << ", " << range.end << ") ";
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
size_t sum_marks = 0;

View File

@ -190,7 +190,6 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition)
{
Block & block = block_with_partition.block;
// std::cerr << "(MergeTreeDataWriter::writeTempPart) block.rows(): " << block.rows() << "\n";
static const String TMP_PREFIX = "tmp_insert_";

View File

@ -28,6 +28,7 @@ struct MergeTreeWriterSettings
bool blocks_are_granules_size;
String filename_suffix = "";
size_t estimated_size = 0;
bool skip_offsets = false;
};
}

View File

@ -61,7 +61,7 @@ private:
};
constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; }
constexpr inline auto getNonAdaptiveMrkSize() { return sizeof(MarkInCompressedFile) * 2; }
constexpr inline auto getNonAdaptiveMrkSize() { return sizeof(UInt64) * 2; }
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type);
size_t getAdaptiveMrkSize(MergeTreeDataPartType part_type, size_t columns_num);

View File

@ -27,7 +27,6 @@ MergeTreeIndexGranulePtr MergeTreeIndexReader::read()
{
auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream.data_buffer);
std::cerr << "(MergeTreeIndexReader) granule.empty(): " << granule->empty() << "\n";
return granule;
}

View File

@ -231,9 +231,6 @@ void MergeTreeRangeReader::ReadResult::addGranule(size_t num_rows_)
void MergeTreeRangeReader::ReadResult::adjustLastGranule()
{
std::cerr << "(adjustLastGranule) num_read_rows: " << num_read_rows << "\n";
std::cerr << "(adjustLastGranule) total_rows_per_granule: " << total_rows_per_granule << "\n";
size_t num_rows_to_subtract = total_rows_per_granule - num_read_rows;
if (rows_per_granule.empty())

View File

@ -55,10 +55,6 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr
size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
/// FIXME compute correct granularity
std::cerr << "(MergeTreeReaderCompact::readRows) max_rows_to_read: " << max_rows_to_read << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) from_mark: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) continue_reading: " << continue_reading << "\n";
if (continue_reading)
from_mark = next_mark;
@ -75,7 +71,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
{
if (!column_positions[pos])
continue;
auto & [name, type] = *name_and_type;
bool append = res_columns[pos] != nullptr;
if (!append)
@ -115,10 +111,6 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
++from_mark;
read_rows += rows_to_read;
std::cerr << "(MergeTreeReaderCompact::readRows) cur mark: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) read_rows: " << read_rows << "\n";
std::cerr << "(MergeTreeReaderCompact::readRows) rows_to_read: " << rows_to_read << "\n";
}
next_mark = from_mark;
@ -128,14 +120,9 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
void MergeTreeReaderCompact::readData(
const String & name, const IDataType & type, IColumn & column,
const String & /* name */, const IDataType & type, IColumn & column,
size_t from_mark, size_t column_position, size_t rows_to_read)
{
std::cerr << "(MergeTreeReaderCompact::readData) from_mark: " << from_mark << "\n";
std::cerr << "(MergeTreeReaderCompact::readData) column_position: " << column_position << "\n";
std::cerr << "(MergeTreeReaderCompact::readData) rows_to_read: " << rows_to_read << "\n";
std::cerr << "(MergeTreeReaderCompact::readData) start reading column: " << name << "\n";
/// FIXME seek only if needed
seekToMark(from_mark, column_position);
@ -147,9 +134,6 @@ void MergeTreeReaderCompact::readData(
IDataType::DeserializeBinaryBulkStatePtr state;
type.deserializeBinaryBulkStatePrefix(deserialize_settings, state);
type.deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state);
// std::cerr << "(MergeTreeReaderCompact::readData) end reading column rows: " << column.size() << "\n";
// std::cerr << "(MergeTreeReaderCompact::readData) end reading column: " << name << "\n";
}
@ -166,9 +150,6 @@ void MergeTreeReaderCompact::initMarksLoader()
size_t marks_count = data_part->getMarksCount();
size_t mark_size_in_bytes = data_part->index_granularity_info.mark_size_in_bytes;
std::cerr << "(initMarksLoader) marks_count: " << marks_count << "\n";
std::cerr << "() mark_size_in_bytes: " << mark_size_in_bytes << "\n";
size_t expected_file_size = mark_size_in_bytes * marks_count;
if (expected_file_size != file_size)
throw Exception(
@ -180,8 +161,6 @@ void MergeTreeReaderCompact::initMarksLoader()
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_num);
// std::cerr << "(MergeTreeReaderCompact::loadMarks) marks_count: " << marks_count << "\n";
ReadBufferFromFile buffer(mrk_path, file_size);
size_t i = 0;
@ -189,14 +168,9 @@ void MergeTreeReaderCompact::initMarksLoader()
{
buffer.seek(sizeof(size_t), SEEK_CUR);
buffer.readStrict(reinterpret_cast<char *>(res->data() + i * columns_num), sizeof(MarkInCompressedFile) * columns_num);
// std::cerr << "(MergeTreeReaderCompact::loadMarks) i: " << i << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) buffer pos in file: " << buffer.getPositionInFile() << "\n";
++i;
}
// std::cerr << "(MergeTreeReaderCompact::loadMarks) file_size: " << file_size << "\n";
// std::cerr << "(MergeTreeReaderCompact::loadMarks) correct file size: " << i * mark_size_in_bytes << "\n";
if (i * mark_size_in_bytes != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
@ -211,9 +185,6 @@ void MergeTreeReaderCompact::initMarksLoader()
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);
std::cerr << "(MergeTreeReaderCompact::seekToMark) mark: (" << mark.offset_in_compressed_file << ", " << mark.offset_in_decompressed_block << "\n";
try
{
if (cached_buffer)

View File

@ -112,13 +112,9 @@ void MergeTreeReaderStream::initMarksLoader()
auto load = [this](const String & mrk_path) -> MarkCache::MappedPtr
{
std::cerr << "reading marks from path: " << mrk_path << "\n";
std::cerr << "marks: " << marks_count << "\n";
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
std::cerr << "data_file_extension: " << data_file_extension << '\n';
size_t file_size = Poco::File(mrk_path).getSize();
size_t mark_size = mode == ReadingMode::INDEX
? index_granularity_info->skip_index_mark_size_in_bytes

View File

@ -146,8 +146,6 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
throw;
}
// std::cerr << "(MergeTreeReaderWide::readRows) read_rows: " << read_rows << "\n";
return read_rows;
}
@ -169,9 +167,6 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
if (!data_file_exists)
return;
std::cerr << "(addStreams) part: " << path << '\n';
std::cerr << "(addStreams) marks count: " << data_part->getMarksCount() << "\n";
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
all_mark_ranges, settings, mark_cache,
@ -191,8 +186,6 @@ void MergeTreeReaderWide::readData(
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool with_offsets)
{
std::cerr << "(MergeTreeReaderWide::readData) name: " << name << "\n";
std::cerr << "(MergeTreeReaderWide::readData) max_rows_to_read: " << max_rows_to_read << "\n";
auto get_stream_getter = [&](bool stream_for_prefix) -> IDataType::InputStreamGetter
{
return [&, stream_for_prefix](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *

View File

@ -51,10 +51,6 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
columns_for_reader = data_part->columns.addTypes(columns_to_read);
}
std::cerr << "(MergeTreeSequentialBlockInputStream) table: " << storage.getTableName() << "\n";
std::cerr << "(MergeTreeSequentialBlockInputStream) part: " << data_part_->getFullPath() << "\n";
std::cerr << "(MergeTreeSequentialBlockInputStream) columns_for_reader: " << columns_for_reader.toString() << "\n";
MergeTreeReaderSettings reader_settings =
{
/// This is hack
@ -136,8 +132,6 @@ try
finish();
}
std::cerr << "(MergeTreeSequentialBlockInputStream::readImpl) block: " << res.dumpStructure() << "\n";
return res;
}
catch (...)

View File

@ -70,10 +70,6 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
owned_mark_cache = storage.global_context.getMarkCache();
std::cerr << "In Part: " << task->data_part->getFullPath() << "\n";
std::cerr << "task->columns: " << task->columns.toString() << "\n";
std::cerr << "part->columns: " << task->data_part->columns.toString() << "\n";
reader = task->data_part->getReader(task->columns, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);

View File

@ -14,26 +14,21 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
: IMergedBlockOutputStream(data_part),
header(header_), sync(sync_)
{
// std::cerr << "(MergedColumnOnlyOutputStream) storage: " << storage.getTableName() << "\n";
// std::cerr << "(MergedColumnOnlyOutputStream) can_use_adaptive_granularity: " << can_use_adaptive_granularity << "\n";
// std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info: " << !!index_granularity_info_ << "\n";
// if (index_granularity_info_)
// std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info->isAdaptive(): " << index_granularity_info_->is_adaptive << "\n";
MergeTreeWriterSettings writer_settings(
data_part->storage.global_context.getSettings(),
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity());
writer_settings.filename_suffix = filename_suffix;
writer_settings.skip_offsets = skip_offsets_;
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc, default_codec, writer_settings, index_granularity);
writer->setOffsetColumns(offset_columns_, skip_offsets_);
writer->setWrittenOffsetColumns(offset_columns_);
writer->initSkipIndices();
}
void MergedColumnOnlyOutputStream::write(const Block & block)
{
std::set<String> skip_indexes_column_names_set;
for (const auto & index : storage.skip_indices) /// FIXME save local indices
for (const auto & index : writer->getSkipIndices())
std::copy(index->columns.cbegin(), index->columns.cend(),
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
@ -44,8 +39,6 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
if (!rows)
return;
std::cerr << "(MergedColumnOnlyOutputStream::write) writing rows: " << rows << "\n";
writer->write(block);
writer->calculateAndSerializeSkipIndices(skip_indexes_block, rows);
writer->next();

View File

@ -3,7 +3,7 @@
#include <Columns/ColumnVector.h>
// I know that inclusion of .cpp is not good at all
#include <Storages/MergeTree/IMergedBlockOutputStream.cpp>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.cpp>
using namespace DB;
static Block getBlockWithSize(size_t required_size_in_bytes, size_t size_of_row_in_bytes)

View File

@ -1,5 +1,5 @@
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <DataTypes/DataTypeDate.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
@ -80,7 +80,7 @@ void run(String part_path, String date_column, String dest_path)
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
MergeTreeDataPart::MinMaxIndex minmax_idx(min_date, max_date);
IMergeTreeDataPart::MinMaxIndex minmax_idx(min_date, max_date);
Names minmax_idx_columns = {date_column};
DataTypes minmax_idx_column_types = {std::make_shared<DataTypeDate>()};
minmax_idx.store(minmax_idx_columns, minmax_idx_column_types, new_tmp_part_path_str, checksums);