This is the last piece of the series articles to analyze the Apache SeaTunnel Zeta Engine Source Code; review the previous series to get the full picture:
\ Let's review the components that execute after the server starts:
\ When no tasks are received by the cluster, these components run. However, when a client sends a SeaTunnelSubmitJobCodec message to the server, how does the server handle it?
Message ReceptionSince the client and server are on different machines, method calls cannot be used; instead, message passing is employed. Upon receiving a message, how does the server process it?
\ Firstly, the client sends a message of type SeaTunnelSubmitJobCodec:
// Client code ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest( jobImmutableInformation.getJobId(), seaTunnelHazelcastClient .getSerializationService() .toData(jobImmutableInformation), jobImmutableInformation.isStartWithSavePoint()); PassiveCompletableFutureIn the SeaTunnelSubmitJobCodec class, it is associated with a SeaTunnelMessageTaskFactoryProvider class, which maps message types to MessageTask classes. For SeaTunnelSubmitJobCodec, it maps to the SubmitJobTaskclass:
\
private final Int2ObjectHashMap\ When examining the SubmitJobTask class, it invokes the SubmitJobOperation class:
@Override protected Operation prepareOperation() { return new SubmitJobOperation( parameters.jobId, parameters.jobImmutableInformation, parameters.isStartWithSavePoint); }\ In the SubmitJobOperation class, the task information is handed over to the CoordinatorService component via its submitJob method:
@Override protected PassiveCompletableFuture> doRun() throws Exception { SeaTunnelServer seaTunnelServer = getService(); return seaTunnelServer .getCoordinatorService() .submitJob(jobId, jobImmutableInformation, isStartWithSavePoint); }At this point, a client message is effectively handed over to the server for method invocation. Other types of operations can be traced in a similar manner.
CoordinatorServiceNext, let’s explore how CoordinatorService handles job submissions:
public PassiveCompletableFutureIn the server, a JobMaster object is created to manage the individual task. During the JobMaster creation, it retrieves the resource manager via getResourceManager() and job history information via getJobHistoryService(). The jobHistoryService is initialized at startup, while ResourceManager is lazily loaded upon the first task submission:
/** Lazy load for resource manager */ public ResourceManager getResourceManager() { if (resourceManager == null) { synchronized (this) { if (resourceManager == null) { ResourceManager manager = new ResourceManagerFactory(nodeEngine, engineConfig) .getResourceManager(); manager.init(); resourceManager = manager; } } } return resourceManager; } ResourceManagerCurrently, SeaTunnel supports only standalone deployment. When initializing ResourceManager, it gathers all cluster nodes and sends a SyncWorkerProfileOperation to get node information, updating the internal registerWorker state:
@Override public void init() { log.info("Init ResourceManager"); initWorker(); } private void initWorker() { log.info("initWorker... "); List aliveNode = nodeEngine.getClusterService().getMembers().stream() .map(Member::getAddress) .collect(Collectors.toList()); log.info("init live nodes: {}", aliveNode); ListPreviously, we observed that SlotService sends heartbeat messages to the master from each node periodically. Upon receiving these heartbeats, the ResourceManager updates the node statuses in its internal state.
\
@Override public void heartbeat(WorkerProfile workerProfile) { if (!registerWorker.containsKey(workerProfile.getAddress())) { log.info("received new worker register: " + workerProfile.getAddress()); sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join(); } else { log.debug("received worker heartbeat from: " + workerProfile.getAddress()); } registerWorker.put(workerProfile.getAddress(), workerProfile); } JobMasterIn the CoordinatorService, a JobMaster instance is created and its init method is called. When the init method is completed, it is considered that the task creation is successful. The run method is then called to formally execute the task.
\ Let's look at the initialization and init method.
public JobMaster( @NonNull Data jobImmutableInformationData, @NonNull NodeEngine nodeEngine, @NonNull ExecutorService executorService, @NonNull ResourceManager resourceManager, @NonNull JobHistoryService jobHistoryService, @NonNull IMap runningJobStateIMap, @NonNull IMap runningJobStateTimestampsIMap, @NonNull IMap ownedSlotProfilesIMap, @NonNull IMapDuring initialization, only simple variable assignments are performed without any significant operations. We need to focus on the init method.
public synchronized void init(long initializationTimestamp, boolean restart) throws Exception { // The server receives a binary object from the client, // which is first converted to a JobImmutableInformation object, the same object sent by the client jobImmutableInformation = nodeEngine.getSerializationService().toObject(jobImmutableInformationData); // Get the checkpoint configuration, such as the interval, timeout, etc. jobCheckpointConfig = createJobCheckpointConfig( engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig()); LOGGER.info( String.format( "Init JobMaster for Job %s (%s) ", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId())); LOGGER.info( String.format( "Job %s (%s) needed jar urls %s", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls())); ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader(); // Get the ClassLoader ClassLoader classLoader = seaTunnelServer .getClassLoaderService() .getClassLoader( jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()); // Deserialize the logical DAG from the client-provided information logicalDag = CustomClassLoadedObject.deserializeWithCustomClassLoader( nodeEngine.getSerializationService(), classLoader, jobImmutableInformation.getLogicalDag()); try { Thread.currentThread().setContextClassLoader(classLoader); // Execute save mode functionality, such as table creation and deletion if (!restart && !logicalDag.isStartWithSavePoint() && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions()) .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION) .equals(SaveModeExecuteLocation.CLUSTER)) { logicalDag.getLogicalVertexMap().values().stream() .map(LogicalVertex::getAction) .filter(action -> action instanceof SinkAction) .map(sink -> ((SinkAction, ?, ?, ?>) sink).getSink()) .forEach(JobMaster::handleSaveMode); } // Parse the logical plan into a physical plan final Tuple2\ Lastly, let's look at the run method:
public void run() { try { physicalPlan.startJob(); } catch (Throwable e) { LOGGER.severe( String.format( "Job %s (%s) run error with: %s", physicalPlan.getJobImmutableInformation().getJobConfig().getName(), physicalPlan.getJobImmutableInformation().getJobId(), ExceptionUtils.getMessage(e))); } finally { jobMasterCompleteFuture.join(); if (engineConfig.getConnectorJarStorageConfig().getEnable()) { ListThis method is relatively straightforward, calling physicalPlan.startJob() to execute the generated physical plan.
\ From the above code, it is evident that after the server receives a client task submission request, it initializes the JobMasterclass, which generates the physical plan from the logical plan, and then executes the physical plan.
\ Next, we need to delve into how the logical plan is converted into a physical plan.
Conversion From Logical Plan to Physical PlanThe generation of the physical plan is done by calling the following method in JobMaster:
final Tuple2In the method for generating the physical plan, the logical plan is first converted into an execution plan, and then the execution plan is converted into a physical plan.
\
public static Tuple2The ExecutionPlanGenerator class takes a logical plan and produces an execution plan through a series of steps, including generating execution edges, shuffle edges, transform chain edges, and finally, pipelines.
Generation of the Physical PlanThe PhysicalPlanGenerator class converts the execution plan into a physical plan:
public PhysicalPlanGenerator( @NonNull ExecutionPlan executionPlan, @NonNull NodeEngine nodeEngine, @NonNull JobImmutableInformation jobImmutableInformation, long initializationTimestamp, @NonNull ExecutorService executorService, @NonNull FlakeIdGenerator flakeIdGenerator, @NonNull IMap runningJobStateIMap, @NonNull IMap runningJobStateTimestampsIMap, @NonNull QueueType queueType) { this.executionPlan = executionPlan; this.nodeEngine = nodeEngine; this.jobImmutableInformation = jobImmutableInformation; this.initializationTimestamp = initializationTimestamp; this.executorService = executorService; this.flakeIdGenerator = flakeIdGenerator; this.runningJobStateIMap = runningJobStateIMap; this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; this.queueType = queueType; } public PhysicalPlan generate() { List\ Let's examine the contents of these two classes.
public class ExecutionPlan { private final List\ Let's compare it with the logical plan:
public class LogicalDag implements IdentifiedDataSerializable { @Getter private JobConfig jobConfig; private final SetIt seems that each Pipeline resembles a logical plan. Why do we need this transformation step? Let’s take a closer look at the process of generating a logical plan.
\ As shown above, generating an execution plan involves five steps, which we will review one by one.
The Shuffle step addresses specific scenarios where the source supports reading multiple tables, and there are multiple sink nodes depending on this source. In such cases, a shuffle node is added in between.
Step 3 private SetStep five involves generating the execution plan instances, passing the Pipeline parameters generated in step four.
Summary:The execution plan performs the following tasks on the logical plan:
When a source generates multiple tables and multiple sink nodes depend on this source, a shuffle node is added in between.
Attempt to chain merge transform nodes, combining multiple transform nodes into one node.
Split the tasks, dividing a configuration file/LogicalDag into several unrelated tasks represented as List
\
Physical Plan GenerationBefore delving into physical plan generation, let's first review what information is included in the generated physical plan and examine its internal components.
public class PhysicalPlan { private final ListAll Rights Reserved. Copyright , Central Coast Communications, Inc.