Skip to content

Commit a5cfb4a

Browse files
baiyangzhuhongbaiyangzhuhong
and
baiyangzhuhong
authored
feature: Support New Sharding Strategy To Run Balanceful On Multiple Servers For Single Sharding Scenario (#2462)
* feature: #2461 * fix problems of code formatting validation * remove some comments --------- Co-authored-by: baiyangzhuhong <baiyangzhuhong@163.com>
1 parent 88378d2 commit a5cfb4a

File tree

10 files changed

+994
-152
lines changed

10 files changed

+994
-152
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.elasticjob.kernel.executor.facade;
19+
20+
import com.google.common.base.Strings;
21+
22+
import java.util.Collection;
23+
import java.util.Comparator;
24+
import java.util.stream.Collectors;
25+
26+
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
27+
import org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException;
28+
import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
29+
import org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext;
30+
import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
31+
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
32+
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
33+
import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
34+
import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
35+
import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
36+
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
37+
import org.apache.shardingsphere.elasticjob.spi.executor.item.param.JobRuntimeService;
38+
import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
39+
import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
40+
import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobExecutionEvent;
41+
import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent;
42+
import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent.State;
43+
44+
import lombok.extern.slf4j.Slf4j;
45+
46+
/**
47+
* Abstract Job facade.
48+
*/
49+
@Slf4j
50+
abstract class AbstractJobFacade implements JobFacade {
51+
52+
private final ConfigurationService configService;
53+
54+
private final ShardingService shardingService;
55+
56+
private final ExecutionContextService executionContextService;
57+
58+
private final ExecutionService executionService;
59+
60+
private final FailoverService failoverService;
61+
62+
private final Collection<ElasticJobListener> elasticJobListeners;
63+
64+
private final JobTracingEventBus jobTracingEventBus;
65+
66+
AbstractJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners, final TracingConfiguration<?> tracingConfig) {
67+
configService = new ConfigurationService(regCenter, jobName);
68+
shardingService = new ShardingService(regCenter, jobName);
69+
executionContextService = new ExecutionContextService(regCenter, jobName);
70+
executionService = new ExecutionService(regCenter, jobName);
71+
failoverService = new FailoverService(regCenter, jobName);
72+
this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList());
73+
this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig);
74+
}
75+
76+
/**
77+
* Load job configuration.
78+
*
79+
* @param fromCache load from cache or not
80+
* @return job configuration
81+
*/
82+
@Override
83+
public JobConfiguration loadJobConfiguration(final boolean fromCache) {
84+
return configService.load(fromCache);
85+
}
86+
87+
/**
88+
* Check job execution environment.
89+
*
90+
* @throws org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException job execution environment exception
91+
*/
92+
@Override
93+
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
94+
configService.checkMaxTimeDiffSecondsTolerable();
95+
}
96+
97+
/**
98+
* Failover If necessary.
99+
*/
100+
@Override
101+
public void failoverIfNecessary() {
102+
if (configService.load(true).isFailover()) {
103+
failoverService.failoverIfNecessary();
104+
}
105+
}
106+
107+
/**
108+
* Register job begin.
109+
*
110+
* @param shardingContexts sharding contexts
111+
*/
112+
@Override
113+
public void registerJobBegin(final ShardingContexts shardingContexts) {
114+
executionService.registerJobBegin(shardingContexts);
115+
}
116+
117+
/**
118+
* Register job completed.
119+
*
120+
* @param shardingContexts sharding contexts
121+
*/
122+
@Override
123+
public void registerJobCompleted(final ShardingContexts shardingContexts) {
124+
executionService.registerJobCompleted(shardingContexts);
125+
if (configService.load(true).isFailover()) {
126+
failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
127+
}
128+
}
129+
130+
public abstract ShardingContexts getShardingContexts();
131+
132+
/**
133+
* Set task misfire flag.
134+
*
135+
* @param shardingItems sharding items to be set misfire flag
136+
* @return whether satisfy misfire condition
137+
*/
138+
@Override
139+
public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
140+
return executionService.misfireIfHasRunningItems(shardingItems);
141+
}
142+
143+
/**
144+
* Clear misfire flag.
145+
*
146+
* @param shardingItems sharding items to be cleared misfire flag
147+
*/
148+
@Override
149+
public void clearMisfire(final Collection<Integer> shardingItems) {
150+
executionService.clearMisfire(shardingItems);
151+
}
152+
153+
/**
154+
* Judge job whether to need to execute misfire tasks.
155+
*
156+
* @param shardingItems sharding items
157+
* @return need to execute misfire tasks or not
158+
*/
159+
@Override
160+
public boolean isExecuteMisfired(final Collection<Integer> shardingItems) {
161+
return configService.load(true).isMisfire() && !isNeedSharding() && !executionService.getMisfiredJobItems(shardingItems).isEmpty();
162+
}
163+
164+
/**
165+
* Judge job whether to need resharding.
166+
*
167+
* @return need resharding or not
168+
*/
169+
@Override
170+
public boolean isNeedSharding() {
171+
return shardingService.isNeedSharding();
172+
}
173+
174+
/**
175+
* Call before job executed.
176+
*
177+
* @param shardingContexts sharding contexts
178+
*/
179+
@Override
180+
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
181+
for (ElasticJobListener each : elasticJobListeners) {
182+
each.beforeJobExecuted(shardingContexts);
183+
}
184+
}
185+
186+
/**
187+
* Call after job executed.
188+
*
189+
* @param shardingContexts sharding contexts
190+
*/
191+
@Override
192+
public void afterJobExecuted(final ShardingContexts shardingContexts) {
193+
for (ElasticJobListener each : elasticJobListeners) {
194+
each.afterJobExecuted(shardingContexts);
195+
}
196+
}
197+
198+
/**
199+
* Post job execution event.
200+
*
201+
* @param jobExecutionEvent job execution event
202+
*/
203+
@Override
204+
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
205+
jobTracingEventBus.post(jobExecutionEvent);
206+
}
207+
208+
/**
209+
* Post job status trace event.
210+
*
211+
* @param taskId task Id
212+
* @param state job state
213+
* @param message job message
214+
*/
215+
@Override
216+
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
217+
TaskContext taskContext = TaskContext.from(taskId);
218+
jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
219+
taskContext.getSlaveId(), taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
220+
if (!Strings.isNullOrEmpty(message)) {
221+
log.trace(message);
222+
}
223+
}
224+
225+
/**
226+
* Get job runtime service.
227+
*
228+
* @return job runtime service
229+
*/
230+
@Override
231+
public JobRuntimeService getJobRuntimeService() {
232+
return new JobJobRuntimeServiceImpl(this);
233+
}
234+
}

0 commit comments

Comments
 (0)