Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24702 Fix of CompactedException on the end of rebalancing #5468

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void testGlobalMinimumTxRequiredTime() {
}

@Test
public void testCompactionRun() throws InterruptedException {
public void testCompactionRun() {
sql(format("create zone if not exists test with partitions={}, replicas={}, storage_profiles='default'",
CLUSTER_SIZE, CLUSTER_SIZE)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public List<UpdateEntry> get(UpdateContext updateContext) {

for (UpdateEntry entry : entries) {
updateContext.updateCatalog(
catalog -> entry.applyUpdate(catalog, CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN)
catalog -> entry.applyUpdate(catalog, CatalogManagerImpl.INITIAL_TIMESTAMP)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
private static final int MAX_RETRY_COUNT = 10;

/**
* Initial update token for a catalog descriptor, this token is valid only before the first call of
* {@link UpdateEntry#applyUpdate(Catalog, long)}.
* Initial update timestamp for a catalog descriptor, this token is valid only before the first call of
* {@link UpdateEntry#applyUpdate(Catalog, HybridTimestamp)}.
*
* <p>After that {@link CatalogObjectDescriptor#updateToken()} will be initialised with a causality token from
* {@link UpdateEntry#applyUpdate(Catalog, long)}
* <p>After that {@link CatalogObjectDescriptor#updateTimestamp()} will be initialised with a timestamp from
* {@link UpdateEntry#applyUpdate(Catalog, HybridTimestamp)}
*/
public static final long INITIAL_CAUSALITY_TOKEN = 0L;
public static final HybridTimestamp INITIAL_TIMESTAMP = HybridTimestamp.MIN_VALUE;

/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(CatalogManagerImpl.class);
Expand Down Expand Up @@ -405,7 +405,7 @@ private CompletableFuture<CatalogApplyResult> saveUpdate(List<UpdateProducer> up
}

for (UpdateEntry entry : entries) {
updateContext.updateCatalog(cat -> entry.applyUpdate(cat, INITIAL_CAUSALITY_TOKEN));
updateContext.updateCatalog(cat -> entry.applyUpdate(cat, INITIAL_TIMESTAMP));
}

applyResults.set(i);
Expand Down Expand Up @@ -476,7 +476,7 @@ private CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp m
assert catalog != null : version - 1;

for (UpdateEntry entry : update.entries()) {
catalog = entry.applyUpdate(catalog, causalityToken);
catalog = entry.applyUpdate(catalog, metaStorageUpdateTimestamp);
}

catalog = applyUpdateFinal(catalog, update, metaStorageUpdateTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public static CatalogSchemaDescriptor replaceTable(CatalogSchemaDescriptor schem
tableDescriptors,
schema.indexes(),
schema.systemViews(),
newTableDescriptor.updateToken()
newTableDescriptor.updateTimestamp()
);
}
}
Expand Down Expand Up @@ -398,7 +398,7 @@ public static CatalogSchemaDescriptor replaceIndex(CatalogSchemaDescriptor schem
schema.tables(),
indexDescriptors,
schema.systemViews(),
newIndexDescriptor.updateToken()
newIndexDescriptor.updateTimestamp()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.internal.catalog.commands;

import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_TIMESTAMP;
import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;

import java.util.List;
Expand Down Expand Up @@ -83,7 +83,7 @@ public List<UpdateEntry> get(UpdateContext updateContext) {
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
new CatalogSystemViewDescriptor[0],
INITIAL_CAUSALITY_TOKEN
INITIAL_TIMESTAMP
);

return List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.ignite.internal.catalog.descriptors;

import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_TIMESTAMP;

import java.util.List;
import java.util.Objects;
import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntryType;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.S;

/** Hash index descriptor. */
Expand All @@ -40,7 +41,7 @@ public class CatalogHashIndexDescriptor extends CatalogIndexDescriptor {
* @throws IllegalArgumentException If columns list contains duplicates.
*/
public CatalogHashIndexDescriptor(int id, String name, int tableId, boolean unique, List<String> columns) {
this(id, name, tableId, unique, CatalogIndexStatus.REGISTERED, columns, INITIAL_CAUSALITY_TOKEN, false);
this(id, name, tableId, unique, CatalogIndexStatus.REGISTERED, columns, INITIAL_TIMESTAMP, false);
}

/**
Expand All @@ -65,7 +66,7 @@ public CatalogHashIndexDescriptor(
List<String> columns,
boolean isCreatedWithTable
) {
this(id, name, tableId, unique, status, columns, INITIAL_CAUSALITY_TOKEN, isCreatedWithTable);
this(id, name, tableId, unique, status, columns, INITIAL_TIMESTAMP, isCreatedWithTable);
}

/**
Expand All @@ -77,7 +78,7 @@ public CatalogHashIndexDescriptor(
* @param unique Unique flag.
* @param status Index status.
* @param columns A list of indexed columns. Must not contains duplicates.
* @param causalityToken Token of the update of the descriptor.
* @param timestamp Timestamp of the update of the descriptor.
* @param isCreatedWithTable Flag indicating that this index has been created at the same time as its table.
*
* @throws IllegalArgumentException If columns list contains duplicates.
Expand All @@ -89,10 +90,10 @@ public CatalogHashIndexDescriptor(
boolean unique,
CatalogIndexStatus status,
List<String> columns,
long causalityToken,
HybridTimestamp timestamp,
boolean isCreatedWithTable
) {
super(CatalogIndexDescriptorType.HASH, id, name, tableId, unique, status, causalityToken, isCreatedWithTable);
super(CatalogIndexDescriptorType.HASH, id, name, tableId, unique, status, timestamp, isCreatedWithTable);

this.columns = List.copyOf(Objects.requireNonNull(columns, "columns"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.ignite.internal.catalog.descriptors;

import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_TIMESTAMP;
import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.readStringCollection;
import static org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeStringCollection;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -27,6 +30,7 @@
import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectDataOutput;
import org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
import org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;

Expand All @@ -43,21 +47,25 @@ static class HashIndexDescriptorSerializerV1 implements CatalogObjectSerializer<
public CatalogHashIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();

// Read the update token.
input.readVarInt();

int tableId = input.readVarIntAsInt();
boolean unique = input.readBoolean();
CatalogIndexStatus status = CatalogIndexStatus.forId(input.readByte());
boolean isCreatedWithTable = input.readBoolean();
List<String> columns = readStringCollection(input, ArrayList::new);

return new CatalogHashIndexDescriptor(id, name, tableId, unique, status, columns, updateToken, isCreatedWithTable);
// Here we use the initial timestamp because it's old storage. This value will be processed by data nodes manager.
return new CatalogHashIndexDescriptor(id, name, tableId, unique, status, columns, INITIAL_TIMESTAMP, isCreatedWithTable);
}

@Override
public void writeTo(CatalogHashIndexDescriptor descriptor, CatalogObjectDataOutput output) throws IOException {
output.writeVarInt(descriptor.id());
output.writeUTF(descriptor.name());
output.writeVarInt(descriptor.updateToken());
output.writeVarInt(descriptor.updateTimestamp().longValue());
output.writeVarInt(descriptor.tableId());
output.writeBoolean(descriptor.unique());
output.writeByte(descriptor.status().id());
Expand All @@ -72,21 +80,22 @@ static class HashIndexDescriptorSerializerV2 implements CatalogObjectSerializer<
public CatalogHashIndexDescriptor readFrom(CatalogObjectDataInput input) throws IOException {
int id = input.readVarIntAsInt();
String name = input.readUTF();
long updateToken = input.readVarInt();
long updateTimestampLong = input.readVarInt();
HybridTimestamp updateTimestamp = updateTimestampLong == 0 ? MIN_VALUE : hybridTimestamp(updateTimestampLong);
int tableId = input.readVarIntAsInt();
boolean unique = input.readBoolean();
CatalogIndexStatus status = CatalogIndexStatus.forId(input.readByte());
boolean isCreatedWithTable = input.readBoolean();
List<String> columns = input.readObjectCollection(IgniteUnsafeDataInput::readUTF, ArrayList::new);

return new CatalogHashIndexDescriptor(id, name, tableId, unique, status, columns, updateToken, isCreatedWithTable);
return new CatalogHashIndexDescriptor(id, name, tableId, unique, status, columns, updateTimestamp, isCreatedWithTable);
}

@Override
public void writeTo(CatalogHashIndexDescriptor value, CatalogObjectDataOutput output) throws IOException {
output.writeVarInt(value.id());
output.writeUTF(value.name());
output.writeVarInt(value.updateToken());
output.writeVarInt(value.updateTimestamp().longValue());
output.writeVarInt(value.tableId());
output.writeBoolean(value.unique());
output.writeByte(value.status().id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Objects;
import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntry;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.S;

/** Index descriptor base class. */
Expand Down Expand Up @@ -54,10 +55,10 @@ public abstract class CatalogIndexDescriptor extends CatalogObjectDescriptor imp
int tableId,
boolean unique,
CatalogIndexStatus status,
long causalityToken,
HybridTimestamp timestamp,
boolean createdWithTable
) {
super(id, Type.INDEX, name, causalityToken);
super(id, Type.INDEX, name, timestamp);
this.indexType = indexType;
this.tableId = tableId;
this.unique = unique;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.ignite.internal.catalog.descriptors;

import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_TIMESTAMP;

import java.util.Objects;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.S;

/**
Expand All @@ -30,13 +31,13 @@ public abstract class CatalogObjectDescriptor {
private final int id;
private final String name;
private final Type type;
private long updateToken;
private HybridTimestamp updateTimestamp;

CatalogObjectDescriptor(int id, Type type, String name, long causalityToken) {
CatalogObjectDescriptor(int id, Type type, String name, HybridTimestamp timestamp) {
this.id = id;
this.type = Objects.requireNonNull(type, "type");
this.name = Objects.requireNonNull(name, "name");
this.updateToken = causalityToken;
this.updateTimestamp = timestamp;
}

/** Returns id of the described object. */
Expand All @@ -55,29 +56,29 @@ public Type type() {
}

/**
* Token of the update of the descriptor.
* Updated when {@link UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)} is called for the
* corresponding catalog descriptor. This token is the token that is associated with the corresponding update being applied to
* Timestamp of the update of the descriptor.
* Updated when {@link UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, HybridTimestamp)} is called for the
* corresponding catalog descriptor. This timestamp is the timestamp that is associated with the corresponding update being applied to
* the Catalog. Any new catalog descriptor associated with an {@link UpdateEntry}, meaning that this token is set only once.
*
* @return Token of the update of the descriptor.
* @return timestamp of the update of the descriptor.
*/
public long updateToken() {
return updateToken;
public HybridTimestamp updateTimestamp() {
return updateTimestamp;
}

/**
* Set token of the update of the descriptor. Must be called only once when
* {@link UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)} is called for the corresponding catalog descriptor.
* This token is the token that is associated with the corresponding update being applied to
* Set timestamp of the update of the descriptor. Must be called only once when
* {@link UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, HybridTimestamp)} is called for the corresponding catalog
* descriptor. This timestamp is the timestamp that is associated with the corresponding update being applied to
* the Catalog. Any new catalog descriptor associated with an {@link UpdateEntry}, meaning that this token is set only once.
*
* @param updateToken Update token of the descriptor.
* @param updateTimestamp Update timestamp of the descriptor.
*/
public void updateToken(long updateToken) {
assert this.updateToken == INITIAL_CAUSALITY_TOKEN : "Update token for the descriptor must be updated only once";
public void updateTimestamp(HybridTimestamp updateTimestamp) {
assert this.updateTimestamp.equals(INITIAL_TIMESTAMP) : "Update timestamp for the descriptor must be updated only once";

this.updateToken = updateToken;
this.updateTimestamp = updateTimestamp;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Function;
import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntry;
import org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntryType;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -54,8 +55,9 @@ public CatalogSchemaDescriptor(int id, String name,
CatalogTableDescriptor[] tables,
CatalogIndexDescriptor[] indexes,
CatalogSystemViewDescriptor[] systemViews,
long causalityToken) {
super(id, Type.SCHEMA, name, causalityToken);
HybridTimestamp timestamp
) {
super(id, Type.SCHEMA, name, timestamp);
this.tables = Objects.requireNonNull(tables, "tables");
this.indexes = Objects.requireNonNull(indexes, "indexes");
this.systemViews = Objects.requireNonNull(systemViews, "systemViews");
Expand Down
Loading