Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 172 additions & 17 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,16 @@ CompressedRelationReader::asyncParallelBlockGenerator(
blockMetadata, scanConfig_.scanColumns_);

lock.unlock();
auto decompressedBlockAndMetadata =
reader_->decompressAndPostprocessBlock(compressedBlock,
blockMetadata.numRows_,
scanConfig_, blockMetadata);

// If this block's data is shared from another permutation, delegate.
auto decompressedBlockAndMetadata = [&]() {
if (blockMetadata.sharingInfo_.has_value()) {
return reader_->readSharedBlock(blockMetadata, scanConfig_);
}
return reader_->decompressAndPostprocessBlock(
compressedBlock, blockMetadata.numRows_, scanConfig_,
blockMetadata);
}();
return std::pair{myIndex,
std::optional{std::move(decompressedBlockAndMetadata)}};
};
Expand Down Expand Up @@ -1208,13 +1214,111 @@ CompressedRelationReader::readAndDecompressBlock(
if (scanConfig.graphFilter_.canBlockBeSkipped(blockMetaData)) {
return std::nullopt;
}
// If this block's data is shared from another permutation, delegate.
if (blockMetaData.sharingInfo_.has_value()) {
return readSharedBlock(blockMetaData, scanConfig);
}
CompressedBlock compressedColumns =
readCompressedBlockFromFile(blockMetaData, scanConfig.scanColumns_);
const auto numRowsToRead = blockMetaData.numRows_;
return decompressAndPostprocessBlock(compressedColumns, numRowsToRead,
scanConfig, blockMetaData);
}

// ____________________________________________________________________________
DecompressedBlockAndMetadata CompressedRelationReader::readSharedBlock(
const CompressedBlockMetadata& blockMetaData,
const ScanImplConfig& scanConfig) const {
AD_CONTRACT_CHECK(blockMetaData.sharingInfo_.has_value());
const auto& sharing = blockMetaData.sharingInfo_.value();
AD_CONTRACT_CHECK(sharing.sourceBlockIndex_ !=
std::numeric_limits<size_t>::max());

const SharedPermutationAccess* access = nullptr;
if (sharing.type_ == BlockSharingInfo::Type::SisterPermutation) {
AD_CONTRACT_CHECK(sisterAccess_.has_value());
access = &sisterAccess_.value();
} else {
AD_CORRECTNESS_CHECK(false, "Cross-pair sharing is not yet implemented.");
}

AD_CONTRACT_CHECK(sharing.sourceBlockIndex_ < access->blocks->size());
const auto& sourceBlock = access->blocks->at(sharing.sourceBlockIndex_);
AD_CONTRACT_CHECK(!sourceBlock.sharingInfo_.has_value(),
"Source block must not itself be shared.");

// Read all columns from the source block (we need to rearrange them).
std::vector<ColumnIndex> allColumns(scanConfig.scanColumns_.begin(),
scanConfig.scanColumns_.end());
CompressedBlock compressedColumns =
access->reader->readCompressedBlockFromFile(sourceBlock, allColumns);
DecompressedBlock decompressed =
access->reader->decompressBlock(compressedColumns, sourceBlock.numRows_);

// Find the positions of col1 (block column 1) and col2 (block column 2)
// in the decompressed IdTable. Their positions depend on whether col0
// (block column 0) was included in the scan columns.
auto it1 = ql::ranges::find(allColumns, ColumnIndex{1});
auto it2 = ql::ranges::find(allColumns, ColumnIndex{2});

if (sharing.type_ == BlockSharingInfo::Type::SisterPermutation) {
// Sister permutation: swap col1 and col2, then resort.
if (it1 != allColumns.end() && it2 != allColumns.end()) {
size_t pos1 = it1 - allColumns.begin();
size_t pos2 = it2 - allColumns.begin();
decompressed.swapColumns(pos1, pos2);
}
// Find the position of the graph column in the decompressed table.
auto itGraph =
ql::ranges::find(allColumns, ColumnIndex{ADDITIONAL_COLUMN_GRAPH_ID});
AD_CORRECTNESS_CHECK(itGraph != allColumns.end());
size_t graphPos = itGraph - allColumns.begin();
// Resort by all triple columns plus graph (columns 0 through graphPos).
auto compare = [graphPos](const auto& a, const auto& b) {
for (size_t i = 0; i <= graphPos; ++i) {
if (a[i] != b[i]) {
return a[i] < b[i];
}
}
return false;
};
ql::ranges::sort(decompressed, compare);
} else {
// Cross-pair permutation: swap col0 and col1 (block columns 0 and 1).
// No resort needed because the remaining columns are already sorted.
auto it0 = ql::ranges::find(allColumns, ColumnIndex{0});
if (it0 != allColumns.end() && it1 != allColumns.end()) {
size_t pos0 = it0 - allColumns.begin();
size_t pos1 = it1 - allColumns.begin();
decompressed.swapColumns(pos0, pos1);
}
}

// Now postprocess (graph filtering, located triples merging).
bool hasUpdates = false;
auto numIndexColumns =
std::min(decompressed.numColumns(), static_cast<size_t>(3));
bool includeGraphColumn =
ql::ranges::find(scanConfig.scanColumns_, ADDITIONAL_COLUMN_GRAPH_ID) !=
scanConfig.scanColumns_.end();

auto& locatedTriples = scanConfig.locatedTriples_;
if (locatedTriples.containsTriples(blockMetaData.blockIndex_)) {
locatedTriples.mergeTriples(blockMetaData.blockIndex_, decompressed,
numIndexColumns, includeGraphColumn);
hasUpdates = true;
}
bool wasPostprocessed = false;
if (useGraphPostProcessing_) {
wasPostprocessed =
scanConfig.graphFilter_.postprocessBlock(decompressed, blockMetaData);
} else {
scanConfig.graphFilter_.deleteGraphColumnIfNecessary(decompressed);
}
return DecompressedBlockAndMetadata{std::move(decompressed), wasPostprocessed,
hasUpdates};
}

// ____________________________________________________________________________
CompressedBlockMetadata::OffsetAndCompressedSize
CompressedRelationWriter::compressAndWriteColumn(ql::span<const Id> column) {
Expand Down Expand Up @@ -1274,32 +1378,83 @@ void CompressedRelationWriter::compressAndWriteBlock(Id firstCol0Id,
auto timer = blockWriteQueueTimer_.startMeasurement();
blockWriteQueue_.push([this, block = std::move(block), firstCol0Id,
lastCol0Id, invokeCallback]() mutable {
std::vector<CompressedBlockMetadata::OffsetAndCompressedSize> offsets;
for (const auto& column : block.getColumns()) {
offsets.push_back(compressAndWriteColumn(column));
}
AD_CORRECTNESS_CHECK(!offsets.empty());
auto numRows = block.numRows();
const auto& first = block[0];
const auto& last = block[numRows - 1];
AD_CORRECTNESS_CHECK(firstCol0Id == first[0]);
AD_CORRECTNESS_CHECK(lastCol0Id == last[0]);

// Check if this block has a constant prefix of length >= 2 and should be
// stored as a cross-pair sharing dummy. We only do this for large-relation
// blocks (invokeCallback=false), because small-relation blocks are shared
// with the sister permutation via the sister callback, and creating a
// cross-pair dummy would break that reference chain.
if (skipConstantPrefixBlocks_ && !invokeCallback && first[0] == last[0] &&
first[1] == last[1]) {
auto [hasDuplicates, graphInfo] = getGraphInfo(block);
blockBuffer_.wlock()->emplace_back(CompressedBlockMetadataNoBlockIndex{
std::nullopt,
numRows,
{first[0], first[1], first[2], first[3]},
{last[0], last[1], last[2], last[3]},
std::move(graphInfo),
hasDuplicates,
BlockSharingInfo{BlockSharingInfo::Type::CrossPairPermutation,
std::numeric_limits<size_t>::max()}});
return;
}

std::vector<CompressedBlockMetadata::OffsetAndCompressedSize> offsets;
for (const auto& column : block.getColumns()) {
offsets.push_back(compressAndWriteColumn(column));
}
AD_CORRECTNESS_CHECK(!offsets.empty());

auto [hasDuplicates, graphInfo] = getGraphInfo(block);
blockBuffer_.wlock()->emplace_back(CompressedBlockMetadataNoBlockIndex{
std::move(offsets),
numRows,
{first[0], first[1], first[2], first[3]},
{last[0], last[1], last[2], last[3]},
std::move(graphInfo),
hasDuplicates});
size_t bufIdx;
{
auto locked = blockBuffer_.wlock();
locked->emplace_back(CompressedBlockMetadataNoBlockIndex{
std::move(offsets),
numRows,
{first[0], first[1], first[2], first[3]},
{last[0], last[1], last[2], last[3]},
std::move(graphInfo),
hasDuplicates,
std::nullopt});
bufIdx = locked->size() - 1;
}
if (invokeCallback && smallBlocksCallback_) {
std::invoke(smallBlocksCallback_, std::move(block));
std::invoke(smallBlocksCallback_, std::move(block), bufIdx);
}
});
timer.stop();
}

// _____________________________________________________________________________
size_t CompressedRelationWriter::addSharedBlockMetadata(
Id firstCol0Id, Id lastCol0Id, const IdTable& block,
BlockSharingInfo sharingInfo) {
auto numRows = block.numRows();
AD_CORRECTNESS_CHECK(numRows > 0);
const auto& first = block[0];
const auto& last = block[numRows - 1];
AD_CORRECTNESS_CHECK(firstCol0Id == first[0]);
AD_CORRECTNESS_CHECK(lastCol0Id == last[0]);

auto [hasDuplicates, graphInfo] = getGraphInfo(block);
auto locked = blockBuffer_.wlock();
locked->emplace_back(CompressedBlockMetadataNoBlockIndex{
std::nullopt,
numRows,
{first[0], first[1], first[2], first[3]},
{last[0], last[1], last[2], last[3]},
std::move(graphInfo),
hasDuplicates,
std::move(sharingInfo)});
return locked->size() - 1;
}

// _____________________________________________________________________________
size_t CompressedRelationReader::getNumberOfBlockMetadataValues(
const BlockMetadataRanges& blockMetadata) {
Expand Down
Loading
Loading