Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24702 Fix of CompactedException on the end of rebalancing #5468

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

denis-chudov
Copy link
Contributor

@denis-chudov denis-chudov changed the title IGNITE-24702 IGNITE-24702 Fix of CompactedException on the end of rebalancing Mar 21, 2025
MetaStorageManager metaStorageManager = ignite.metaStorageManager();

// Wait for the rebalancing to finish.
assertTrue(waitForCondition(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's check that stable has changed, something like

assertValueInStorage(
                metaStorageManager,
                stablePartAssignmentsKey(partId),
                (v) -> Assignments.fromBytes(v).nodes()
                        .stream().map(Assignment::consistentId).collect(Collectors.toSet()),
                Set.of(node(0).name()),
                TIMEOUT_MILLIS
        );

Copy link
Contributor Author

@denis-chudov denis-chudov Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the pending queue is checked for tombstone, not for emptyness, so it should work correctly

throw new IllegalArgumentException("causalityToken must be greater then zero [causalityToken=" + causalityToken + '"');
}

public CompletableFuture<Set<String>> dataNodes(HybridTimestamp timestamp, int catalogVersion, int zoneId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc must be updated accordingly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -43,21 +46,22 @@ static class HashIndexDescriptorSerializerV1 implements CatalogObjectSerializer<
public CatalogHashIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand, why do you change HashIndexDescriptorSerializerV1? You brake compatibility with this changes. As far as I can understand, you only need to change HashIndexDescriptorSerializerV2
But please check with sql folks regarding the correct solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't imply any changes in underlying storage, and caused only by the changes in the object. Now the long taken from storage is interpreted in another way: it's converted into timestamp, but this doesn't break anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed to fix this by reading old token as initial timestamp.

@@ -57,13 +60,14 @@ public CatalogSchemaDescriptor readFrom(CatalogObjectDataInput input) throws IOE

int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same question as for HashIndexDescriptorSerializerV2

@@ -50,21 +53,22 @@ public SortedIndexDescriptorSerializerV1(CatalogEntrySerializerProvider serializ
public CatalogSortedIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same question as for HashIndexDescriptorSerializerV2

@@ -53,14 +56,15 @@ public CatalogSystemViewDescriptor readFrom(CatalogObjectDataInput input) throws
int id = input.readVarIntAsInt();
int schemaId = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same question as for HashIndexDescriptorSerializerV2

@@ -49,7 +52,8 @@ public TableDescriptorSerializerV1(CatalogEntrySerializerProvider serializers) {
public CatalogTableDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same question as for HashIndexDescriptorSerializerV2

@@ -44,7 +48,8 @@ public ZoneDescriptorSerializerV1(CatalogEntrySerializerProvider serializers) {
public CatalogZoneDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same question as for HashIndexDescriptorSerializerV2

@@ -597,6 +597,9 @@ public static CompletableFuture<Void> handleReduceChanged(
ByteArray changeTriggerKey = ZoneRebalanceUtil.pendingChangeTriggerKey(partId);
byte[] rev = ByteUtils.longToBytesKeepingOrder(entry.revision());

ByteArray changeTimestampKey = ZoneRebalanceUtil.pendingChangeTriggerKey(partId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you introduce new key? can't we reuse changeTriggerKey?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is the one that existed before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need changeTimestampKey and why do you need further changes like this

        Condition changeRevisionAndTimestampDontExistOrLessThan = and(
                or(notExists(changeTriggerKey), value(changeTriggerKey).lt(rev)),
                or(notExists(changeTimestampKey), value(changeTimestampKey).lt(timestamp))
        );

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we use changeTriggerKey and store timestamp using this key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obsolete changes, removed, thx

@@ -191,7 +195,8 @@ static class TableDescriptorSerializerV2 implements CatalogObjectSerializer<Cata
public CatalogTableDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
HybridTimestamp updateTimestamp = updateTimestampLong == 0 ? MIN_VALUE : hybridTimestamp(updateTimestampLong);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary? How can it happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary for correct processing of 0 value that may be found in the old storage to convert it to timestamp.

@@ -57,7 +58,7 @@ public class CatalogZoneDescriptor extends CatalogObjectDescriptor implements Ma
* Returns {@code true} if zone upgrade will lead to assignments recalculation.
*/
public static boolean updateRequiresAssignmentsRecalculation(CatalogZoneDescriptor oldDescriptor, CatalogZoneDescriptor newDescriptor) {
if (oldDescriptor.updateToken() == newDescriptor.updateToken()) {
if (oldDescriptor.updateTimestamp() == newDescriptor.updateTimestamp()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it OK to use reference equality here? This might be a bug, every reference equality check must be justified with a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This most likely was missed due to renaming, should be fixed.

Comment on lines 136 to 137
list.add(list.get(list.size() - 1).newDescriptor(table1.name() + "_2", 3, columns(state).subList(0, 20), hybridTimestamp(21232L),
"S1"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very unconventional formatting, please make it look better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -172,9 +156,9 @@ Map<Integer, NavigableMap<Long, CatalogZoneDescriptor>> allZonesByTimestamp(
NavigableMap<Long, CatalogZoneDescriptor> map = allZones.computeIfAbsent(zone.id(), id -> new TreeMap<>());

if (map.isEmpty() || updateRequiresAssignmentsRecalculation(map.lastEntry().getValue(), zone)) {
map.put(catalog.time(), zone);
map.put(zone.updateTimestamp().longValue(), zone);
Copy link
Contributor

@ibessonov ibessonov Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use HybridTimestamp here for keys? This will make code more readable. This class is Comparable, so no worries about that. Please update all the usages of Long instead of HybridTimestamp, thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -88,6 +88,11 @@ public DataNodesHistoryEntry dataNodesForTimestamp(HybridTimestamp timestamp) {
Map.Entry<HybridTimestamp, Set<NodeWithAttributes>> entry = history.floorEntry(timestamp);

if (entry == null) {
if (timestamp.equals(HybridTimestamp.MIN_VALUE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use INITIAL_TIMESTAMP

Copy link
Contributor

@alievmirza alievmirza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like solution when in dataNodes we handle INITIAL_TIMESTAMP that came from old PDS as history.firstEntry(); and return it as data nodes. From my point of view, we need to return data nodes bounded to catalog version that is passed to DistributionZoneManager#dataNodes((HybridTimestamp timestamp, int catalogVersion, int zoneId)). We could take org.apache.ignite.internal.catalog.Catalog#time and use it as timestamp for retrieving data nodes.

However, after discussion with @ibessonov, I think that we could proceed with the current solution, but with obligation to fix all compatibility issues before 3.1 release


CatalogTableDescriptor[] tables = readArray(tableDescriptorSerializer, input, CatalogTableDescriptor.class);
CatalogIndexDescriptor[] indexes = readArray(indexSerializeHelper, input, CatalogIndexDescriptor.class);
CatalogSystemViewDescriptor[] systemViews = readArray(viewDescriptorSerializer, input, CatalogSystemViewDescriptor.class);

return new CatalogSchemaDescriptor(id, name, tables, indexes, systemViews, updateToken);
// Here we use the initial timestamp because it's old storage. This value will be processed by data nodes manager.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is correct only for ZoneDescriptor serialiser, because only ZoneDescriptor actually uses updateTimestamp and uses it in data nodes manager. For all other serialisers, like for TableDescriptor and etc, phrase like "This value will be processed by data nodes manager." is incorrect. Please, fix comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants