-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-24702 Fix of CompactedException on the end of rebalancing #5468
base: main
Are you sure you want to change the base?
Conversation
e7caa72
to
28c9676
Compare
MetaStorageManager metaStorageManager = ignite.metaStorageManager(); | ||
|
||
// Wait for the rebalancing to finish. | ||
assertTrue(waitForCondition(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's check that stable has changed, something like
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
(v) -> Assignments.fromBytes(v).nodes()
.stream().map(Assignment::consistentId).collect(Collectors.toSet()),
Set.of(node(0).name()),
TIMEOUT_MILLIS
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the pending queue is checked for tombstone
, not for emptyness, so it should work correctly
throw new IllegalArgumentException("causalityToken must be greater then zero [causalityToken=" + causalityToken + '"'); | ||
} | ||
|
||
public CompletableFuture<Set<String>> dataNodes(HybridTimestamp timestamp, int catalogVersion, int zoneId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc must be updated accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -43,21 +46,22 @@ static class HashIndexDescriptorSerializerV1 implements CatalogObjectSerializer< | |||
public CatalogHashIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException { | |||
int id = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand, why do you change HashIndexDescriptorSerializerV1
? You brake compatibility with this changes. As far as I can understand, you only need to change HashIndexDescriptorSerializerV2
But please check with sql folks regarding the correct solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't imply any changes in underlying storage, and caused only by the changes in the object. Now the long
taken from storage is interpreted in another way: it's converted into timestamp, but this doesn't break anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We agreed to fix this by reading old token as initial timestamp.
@@ -57,13 +60,14 @@ public CatalogSchemaDescriptor readFrom(CatalogObjectDataInput input) throws IOE | |||
|
|||
int id = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same question as for HashIndexDescriptorSerializerV2
@@ -50,21 +53,22 @@ public SortedIndexDescriptorSerializerV1(CatalogEntrySerializerProvider serializ | |||
public CatalogSortedIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException { | |||
int id = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same question as for HashIndexDescriptorSerializerV2
@@ -53,14 +56,15 @@ public CatalogSystemViewDescriptor readFrom(CatalogObjectDataInput input) throws | |||
int id = input.readVarIntAsInt(); | |||
int schemaId = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same question as for HashIndexDescriptorSerializerV2
@@ -49,7 +52,8 @@ public TableDescriptorSerializerV1(CatalogEntrySerializerProvider serializers) { | |||
public CatalogTableDescriptor readFrom(CatalogObjectDataInput input) throws IOException { | |||
int id = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same question as for HashIndexDescriptorSerializerV2
@@ -44,7 +48,8 @@ public ZoneDescriptorSerializerV1(CatalogEntrySerializerProvider serializers) { | |||
public CatalogZoneDescriptor readFrom(CatalogObjectDataInput input) throws IOException { | |||
int id = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same question as for HashIndexDescriptorSerializerV2
@@ -597,6 +597,9 @@ public static CompletableFuture<Void> handleReduceChanged( | |||
ByteArray changeTriggerKey = ZoneRebalanceUtil.pendingChangeTriggerKey(partId); | |||
byte[] rev = ByteUtils.longToBytesKeepingOrder(entry.revision()); | |||
|
|||
ByteArray changeTimestampKey = ZoneRebalanceUtil.pendingChangeTriggerKey(partId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you introduce new key? can't we reuse changeTriggerKey
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this is the one that existed before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need changeTimestampKey and why do you need further changes like this
Condition changeRevisionAndTimestampDontExistOrLessThan = and(
or(notExists(changeTriggerKey), value(changeTriggerKey).lt(rev)),
or(notExists(changeTimestampKey), value(changeTimestampKey).lt(timestamp))
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we use changeTriggerKey
and store timestamp using this key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obsolete changes, removed, thx
@@ -191,7 +195,8 @@ static class TableDescriptorSerializerV2 implements CatalogObjectSerializer<Cata | |||
public CatalogTableDescriptor readFrom(CatalogObjectDataInput input) throws IOException { | |||
int id = input.readVarIntAsInt(); | |||
String name = input.readUTF(); | |||
long updateToken = input.readVarInt(); | |||
long updateTimestampLong = input.readVarInt(); | |||
HybridTimestamp updateTimestamp = updateTimestampLong == 0 ? MIN_VALUE : hybridTimestamp(updateTimestampLong); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary? How can it happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's necessary for correct processing of 0
value that may be found in the old storage to convert it to timestamp.
@@ -57,7 +58,7 @@ public class CatalogZoneDescriptor extends CatalogObjectDescriptor implements Ma | |||
* Returns {@code true} if zone upgrade will lead to assignments recalculation. | |||
*/ | |||
public static boolean updateRequiresAssignmentsRecalculation(CatalogZoneDescriptor oldDescriptor, CatalogZoneDescriptor newDescriptor) { | |||
if (oldDescriptor.updateToken() == newDescriptor.updateToken()) { | |||
if (oldDescriptor.updateTimestamp() == newDescriptor.updateTimestamp()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it OK to use reference equality here? This might be a bug, every reference equality check must be justified with a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This most likely was missed due to renaming, should be fixed.
list.add(list.get(list.size() - 1).newDescriptor(table1.name() + "_2", 3, columns(state).subList(0, 20), hybridTimestamp(21232L), | ||
"S1")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very unconventional formatting, please make it look better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -172,9 +156,9 @@ Map<Integer, NavigableMap<Long, CatalogZoneDescriptor>> allZonesByTimestamp( | |||
NavigableMap<Long, CatalogZoneDescriptor> map = allZones.computeIfAbsent(zone.id(), id -> new TreeMap<>()); | |||
|
|||
if (map.isEmpty() || updateRequiresAssignmentsRecalculation(map.lastEntry().getValue(), zone)) { | |||
map.put(catalog.time(), zone); | |||
map.put(zone.updateTimestamp().longValue(), zone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use HybridTimestamp
here for keys? This will make code more readable. This class is Comparable
, so no worries about that. Please update all the usages of Long
instead of HybridTimestamp
, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -88,6 +88,11 @@ public DataNodesHistoryEntry dataNodesForTimestamp(HybridTimestamp timestamp) { | |||
Map.Entry<HybridTimestamp, Set<NodeWithAttributes>> entry = history.floorEntry(timestamp); | |||
|
|||
if (entry == null) { | |||
if (timestamp.equals(HybridTimestamp.MIN_VALUE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use INITIAL_TIMESTAMP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not like solution when in dataNodes we handle INITIAL_TIMESTAMP
that came from old PDS as history.firstEntry();
and return it as data nodes. From my point of view, we need to return data nodes bounded to catalog version that is passed to DistributionZoneManager#dataNodes((HybridTimestamp timestamp, int catalogVersion, int zoneId))
. We could take org.apache.ignite.internal.catalog.Catalog#time
and use it as timestamp for retrieving data nodes.
However, after discussion with @ibessonov, I think that we could proceed with the current solution, but with obligation to fix all compatibility issues before 3.1 release
|
||
CatalogTableDescriptor[] tables = readArray(tableDescriptorSerializer, input, CatalogTableDescriptor.class); | ||
CatalogIndexDescriptor[] indexes = readArray(indexSerializeHelper, input, CatalogIndexDescriptor.class); | ||
CatalogSystemViewDescriptor[] systemViews = readArray(viewDescriptorSerializer, input, CatalogSystemViewDescriptor.class); | ||
|
||
return new CatalogSchemaDescriptor(id, name, tables, indexes, systemViews, updateToken); | ||
// Here we use the initial timestamp because it's old storage. This value will be processed by data nodes manager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is correct only for ZoneDescriptor serialiser, because only ZoneDescriptor actually uses updateTimestamp and uses it in data nodes manager. For all other serialisers, like for TableDescriptor and etc, phrase like "This value will be processed by data nodes manager." is incorrect. Please, fix comments
https://issues.apache.org/jira/browse/IGNITE-24702