如何查网站的icp备案,谷歌seo引擎优化,wordpress网站logo没显示,济南机关建设网站Flink CDC版本#xff1a;3.2.1 说明#xff1a;本文从SchemaOperator接收到#xff0c;表结构变更事件开始#xff0c;表结构变更事件应由source端产生#xff0c;本文不讨论。 可以先看流程图#xff0c;研究源码。 参考文章#xff1a; Flink cdc3.0动态变更表结构—…Flink CDC版本3.2.1 说明本文从SchemaOperator接收到表结构变更事件开始表结构变更事件应由source端产生本文不讨论。 可以先看流程图研究源码。 参考文章 Flink cdc3.0动态变更表结构——源码解析
一、源码解析
以Sink to doris举例
SchemaOperator
org.apache.flink.cdc.runtime.operators.schema.SchemaOperator 判断是否是SchemaChangeEvent事件调用processSchemaChangeEvents方法
/** * This method is guaranteed to not be called concurrently with other methods of the operator. */
Override
public void processElement(StreamRecordEvent streamRecord) throws InterruptedException, TimeoutException, ExecutionException { Event event streamRecord.getValue(); if (event instanceof SchemaChangeEvent) { // (0)processSchemaChangeEvents((SchemaChangeEvent) event); } else if (event instanceof DataChangeEvent) { // (13)processDataChangeEvents(streamRecord, (DataChangeEvent) event); } else { throw new RuntimeException(Unknown event type in Stream record: event); }
}调用handleSchemaChangeEvent方法
private void processSchemaChangeEvents(SchemaChangeEvent event) throws InterruptedException, TimeoutException, ExecutionException { TableId tableId event.tableId(); LOG.info( {} Table {} received SchemaChangeEvent {} and start to be blocked., subTaskId, tableId, event); handleSchemaChangeEvent(tableId, event); // Update caches originalSchema.put(tableId, getLatestOriginalSchema(tableId)); schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId)); ListTableId optionalRoutedTable getRoutedTables(tableId); if (!optionalRoutedTable.isEmpty()) { tableIdMappingCache .get(tableId) .forEach(routed - evolvedSchema.put(routed, getLatestEvolvedSchema(routed))); } else { evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId)); }
}handleSchemaChangeEvent调用requestSchemaChange方法请求修改Schema
response.isAccepted()就是注册中心接收了此修改需求。进入if后重点来了output.collect(new StreamRecord(new FlushEvent(tableId))); 。注意这里发送了一个new FlushEvent(tableId)事件这个事件会在SinkWriter用到就是通知SinkWriter要执行flush即把数据刷入到sink端数据库和jdbc的commit相似。 FlushEvent内容非常简单只有tableId但是其类型是FlushEvent此类的doc内容是
An {link Event} from {code SchemaOperator} to notify {code DataSinkWriterOperator} that itstart flushing. 也就是FlushEvent作为特殊数据传递事件接收到此数据的DataSinkWriterOperator会触发其执行flushing操作也就是将目前收到的所有数据都写入目标数据库。可以理解为 schema修改后的数据 -- FlushEvent新插入 -- schema修改前的数据
发送FlushEvent事件后执行requestSchemaChangeResult方法此方法是while阻塞的方法简而言之是等所有writer都完成了FlushEvent前数据的旧表结构的数据写入前一直阻塞不发送新表结构的数据至下游。
最后finishedSchemaChangeEvents.forEach(e - output.collect(new StreamRecord(e))); 简略的说其内部是handler方法中生成的SchemaRegistryRequestHandler#applySchemaChange事件将原始的SchemaChangeEvent转换成新的数据还是根据Flink CDC的schema.change.behavior转换其类型如下 ![[image-20250106113512324.png]]
具体将这些时间发送至下游怎么用暂时没有研究了。
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { if (schemaChangeBehavior SchemaChangeBehavior.EXCEPTION schemaChangeEvent.getType() ! SchemaChangeEventType.CREATE_TABLE) { // CreateTableEvent should be applied even in EXCEPTION mode throw new RuntimeException( String.format( Refused to apply schema change event %s in EXCEPTION mode., schemaChangeEvent)); } // The request will block if another schema change event is being handled SchemaChangeResponse response requestSchemaChange(tableId, schemaChangeEvent); // (1)if (response.isAccepted()) { // (3)LOG.info({} Sending the FlushEvent for table {}., subTaskId, tableId); output.collect(new StreamRecord(new FlushEvent(tableId))); // (4)ListSchemaChangeEvent expectedSchemaChangeEvents response.getSchemaChangeEvents(); schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); // The request will block until flushing finished in each sink writer SchemaChangeResultResponse schemaEvolveResponse requestSchemaChangeResult(); // (5) ListSchemaChangeEvent finishedSchemaChangeEvents schemaEvolveResponse.getFinishedSchemaChangeEvents(); // Update evolved schema changes based on apply results finishedSchemaChangeEvents.forEach(e - output.collect(new StreamRecord(e))); } else if (response.isDuplicate()) { LOG.info( {} Schema change event {} has been handled in another subTask already., subTaskId, schemaChangeEvent); } else if (response.isIgnored()) { LOG.info( {} Schema change event {} has been ignored. No schema evolution needed., subTaskId, schemaChangeEvent); } else { throw new IllegalStateException(Unexpected response status response); }
}requestSchemaChange是一个阻塞的方法while (true)发送SchemaChangeRequest直到返回的response不是Busy。可以看到发送的的SchemaChangeRequest。
private SchemaChangeResponse requestSchemaChange( TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { long schemaEvolveTimeOutMillis System.currentTimeMillis() rpcTimeOutInMillis; while (true) { SchemaChangeResponse response sendRequestToCoordinator( new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); if (response.isRegistryBusy()) { // (2)if (System.currentTimeMillis() schemaEvolveTimeOutMillis) { LOG.info( {} Schema Registry is busy now, waiting for next request..., subTaskId); Thread.sleep(1000); } else { throw new TimeoutException(TimeOut when requesting schema change); } } else { return response; } }
}sendRequestToCoordinator方法是org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway类的也就Flink的内部类。 实习类有 1org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway 2org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway 内部具体逻辑暂不深入了解。 其实际发送至 SchemaRegistry#handleEventFromOperator
private REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse RESPONSE sendRequestToCoordinator(REQUEST request) { try { CompletableFutureCoordinationResponse responseFuture toCoordinator.sendRequestToCoordinator( getOperatorID(), new SerializedValue(request)); return CoordinationResponseUtils.unwrap(responseFuture.get()); } catch (Exception e) { throw new IllegalStateException( Failed to send request to coordinator: request.toString(), e); }
}requestSchemaChangeResult执行的操作非常简单就是等待返回如果跳出while方法结束就代表sink端已经完成所有旧数据的flush在此之前SchemaOperator类不会向下游发送新数据因为FlushEvent后的数据都是schema变更的后的新数据了。
private SchemaChangeResultResponse requestSchemaChangeResult() throws InterruptedException, TimeoutException { CoordinationResponse coordinationResponse sendRequestToCoordinator(new SchemaChangeResultRequest()); long nextRpcTimeOutMillis System.currentTimeMillis() rpcTimeOutInMillis; while (coordinationResponse instanceof SchemaChangeProcessingResponse) { // (6) (7)if (System.currentTimeMillis() nextRpcTimeOutMillis) { Thread.sleep(1000); coordinationResponse sendRequestToCoordinator(new SchemaChangeResultRequest()); } else { throw new TimeoutException(TimeOut when requesting release upstream); } } return ((SchemaChangeResultResponse) coordinationResponse);
}这里的toCoordinator.sendRequestToCoordinator也是使用flink内部的调用过程暂不做研究。 这个发送过程也是被org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry#handleCoordinationRequest接收了并在if (request instanceof SchemaChangeResultRequest)内处理其逻辑。
private REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse RESPONSE sendRequestToCoordinator(REQUEST request) { try { CompletableFutureCoordinationResponse responseFuture toCoordinator.sendRequestToCoordinator( getOperatorID(), new SerializedValue(request)); return CoordinationResponseUtils.unwrap(responseFuture.get()); } catch (Exception e) { throw new IllegalStateException( Failed to send request to coordinator: request.toString(), e); }
}SchemaRegistry
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry
toCoordinator.sendRequestToCoordinator方法就由handleCoordinationRequest接收进入request instanceof SchemaChangeRequest中的handleSchemaChangeRequest方法。
Override
public CompletableFutureCoordinationResponse handleCoordinationRequest( CoordinationRequest request) { CompletableFutureCoordinationResponse responseFuture new CompletableFuture(); runInEventLoop( () - { try { if (request instanceof SchemaChangeRequest) { SchemaChangeRequest schemaChangeRequest (SchemaChangeRequest) request; requestHandler.handleSchemaChangeRequest( schemaChangeRequest, responseFuture); } else if (request instanceof SchemaChangeResultRequest) { requestHandler.getSchemaChangeResult(responseFuture); } else if (request instanceof GetEvolvedSchemaRequest) { handleGetEvolvedSchemaRequest( ((GetEvolvedSchemaRequest) request), responseFuture); } else if (request instanceof GetOriginalSchemaRequest) { handleGetOriginalSchemaRequest( (GetOriginalSchemaRequest) request, responseFuture); } else { throw new IllegalArgumentException( Unrecognized CoordinationRequest type: request); } } catch (Throwable t) { context.failJob(t); throw t; } }, handling coordination request %s, request); return responseFuture;
}SchemaRegistry#handleEventFromOperator方法用于处理DataSinkWriterOperator#handleFlushEvent发送而来的FlushSuccessEvent事件。还是使用handler执行具体逻辑SchemaRegistryRequestHandler#flushSuccess Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { runInEventLoop( () - { try { if (event instanceof FlushSuccessEvent) { FlushSuccessEvent flushSuccessEvent (FlushSuccessEvent) event; LOG.info( Sink subtask {} succeed flushing for table {}., flushSuccessEvent.getSubtask(), flushSuccessEvent.getTableId().toString()); requestHandler.flushSuccess( flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask(), currentParallelism); } else if (event instanceof SinkWriterRegisterEvent) { requestHandler.registerSinkWriter( ((SinkWriterRegisterEvent) event).getSubtask()); } else { throw new FlinkException(Unrecognized Operator Event: event); } } catch (Throwable t) { context.failJob(t); throw t; } }, handling event %s from subTask %d, event, subtask);
}SchemaRegistryRequestHandler
SchemaRegistryRequestHandler是SchemaRegistry的执行器类中schemaChangeStatus是自己的状态记录状态的。 而pendingSubTaskIds是记录待处理的任务id的即数据流ID是含有一个任务所有的并行度的子任务ID。 此处 1pendingSubTaskIds空 - 继续执行 2requestSubTaskId和发送过来的一样则为移除头一个。 3其他pendingSubTaskIds不为空情形则直接返回SchemaChangeResponse.busy()此处的busy就和SchemaOperator的response.isRegistryBusy()对应上了。 继续执行 calculateDerivedSchemaChangeEvents方法是对事件作息写转换根据的是flink的schema evolution的策略进行转换例如通过返回空集合的方式进行忽略 。
schema.change.behavior is of enum type, and could be set to exception, evolve, try_evolve, lenient or ignore.而后此handler的状态修改为WAITING_FOR_FLUSH。 并返回ResponseCode.ACCEPTED的状态此时程序跳转回SchemaOperator#handleSchemaChangeEvent方法。
SchemaRegistryRequestHandler#handleSchemaChangeRequest方法 /** * Handle the {link SchemaChangeRequest} and wait for all sink subtasks flushing. * * param request the received SchemaChangeRequest */public void handleSchemaChangeRequest( SchemaChangeRequest request, CompletableFutureCoordinationResponse response) { // We use requester subTask ID as the pending ticket, because there will be at most 1 schema // change requests simultaneously from each subTask int requestSubTaskId request.getSubTaskId(); synchronized (schemaChangeRequestLock) { // Make sure we handle the first request in the pending list to avoid out-of-order // waiting and blocks checkpointing mechanism. if (schemaChangeStatus RequestStatus.IDLE) { if (pendingSubTaskIds.isEmpty()) { LOG.info( Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this., request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId); } else if (pendingSubTaskIds.get(0) requestSubTaskId) { LOG.info( Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this., request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId); pendingSubTaskIds.remove(0); } else { LOG.info( Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({})., request.getSchemaChangeEvent(), request.getTableId().toString(), requestSubTaskId, pendingSubTaskIds); if (!pendingSubTaskIds.contains(requestSubTaskId)) { pendingSubTaskIds.add(requestSubTaskId); } response.complete(wrap(SchemaChangeResponse.busy())); // (2) return; } SchemaChangeEvent event request.getSchemaChangeEvent(); // If this schema change event has been requested by another subTask, ignore it. if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { LOG.info(Event {} has been addressed before, ignoring it., event); clearCurrentSchemaChangeRequest(); LOG.info( SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request., request); response.complete(wrap(SchemaChangeResponse.duplicate())); return; } schemaManager.applyOriginalSchemaChange(event); ListSchemaChangeEvent derivedSchemaChangeEvents calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); // (14)// If this schema change event is filtered out by LENIENT mode or merging table // route strategies, ignore it. if (derivedSchemaChangeEvents.isEmpty()) { LOG.info(Event {} is omitted from sending to downstream, ignoring it., event); clearCurrentSchemaChangeRequest(); LOG.info( SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request., request); response.complete(wrap(SchemaChangeResponse.ignored())); return; } LOG.info( SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.); // This request has been accepted. schemaChangeStatus RequestStatus.WAITING_FOR_FLUSH; // (3)currentDerivedSchemaChangeEvents new ArrayList(derivedSchemaChangeEvents); response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); // (3) } else { LOG.info( Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({})., request, requestSubTaskId, pendingSubTaskIds); if (!pendingSubTaskIds.contains(requestSubTaskId)) { pendingSubTaskIds.add(requestSubTaskId); } response.complete(wrap(SchemaChangeResponse.busy())); // (2) } }
}SchemaRegistryRequestHandler#getSchemaChangeResult方法 内容就是检查类成员变量SchemaRegistryRequestHandler#schemaChangeStatus的状态
FINISHED - 重置自身状态并返回FINISHED状态非FINISHED - 返回Processing状态SchemaOperator#requestSchemaChangeResult接到SchemaChangeProcessingResponse会在while一直循环等待阻塞。
public void getSchemaChangeResult(CompletableFutureCoordinationResponse response) { Preconditions.checkState( schemaChangeStatus ! RequestStatus.IDLE, Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.); if (schemaChangeStatus RequestStatus.FINISHED) { // (12)schemaChangeStatus RequestStatus.IDLE; LOG.info( SchemaChangeStatus switched from FINISHED to IDLE for request {}, currentDerivedSchemaChangeEvents); // This request has been finished, return it and prepare for the next request ListSchemaChangeEvent finishedEvents clearCurrentSchemaChangeRequest(); SchemaChangeResultResponse resultResponse new SchemaChangeResultResponse(finishedEvents); response.complete(wrap(resultResponse)); } else { // Still working on schema change request, waiting it response.complete(wrap(new SchemaChangeProcessingResponse())); }
}方法flushSuccess用于处理DataSinkWriterOperator返回的FlushSuccessEvent事件。这里有点不好理解。 activeSinkWriters是记录所有可用的writer的索引也就是说writer的并行度可能大于1activeSinkWriters记录的是writer的索引接收的FlushSuccessEvent只是其中一个writer发送的。需要等待所有writer都完成flush才能确定所有的schema修改前的数据都写入数据库了。 1if (activeSinkWriters.size() parallelism)内的就是上述过程。 2if (flushedSinkWriters.equals(activeSinkWriters))代表所有writer都完成了flush。而后修改handler状态为RequestStatus.APPLYING即此handler正在apply schema change。接下来执行applySchemaChange方法 。
/** * Record flushed sink subtasks after receiving FlushSuccessEvent. * * param tableId the subtask in SchemaOperator and table that the FlushEvent is about * param sinkSubtask the sink subtask succeed flushing */public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) { flushedSinkWriters.add(sinkSubtask); if (activeSinkWriters.size() parallelism) { LOG.info( Not all active sink writers have been registered. Current {}, expected {}., activeSinkWriters.size(), parallelism); return; } if (flushedSinkWriters.equals(activeSinkWriters)) { Preconditions.checkState( schemaChangeStatus RequestStatus.WAITING_FOR_FLUSH, Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not schemaChangeStatus); schemaChangeStatus RequestStatus.APPLYING; // (9)LOG.info( All sink subtask have flushed for table {}. Start to apply schema change., tableId.toString()); schemaChangeThreadPool.submit( () - applySchemaChange(tableId, currentDerivedSchemaChangeEvents)); }
}SchemaRegistryRequestHandler#applySchemaChange方法 内部主要是schemaManager.applyEvolvedSchemaChange(changeEvent)即执行表结构变更操作其接口类org.apache.flink.cdc.common.sink.MetadataApplier的doc内容
{code MetadataApplier} is used to apply metadata changes to external systems. 可以看到schemaManager至对外部数据执行的表结构变更其实就是sink端的数据库其内部一般是收到需要变更的内容拼接SQL并发送到数据库执行。
最后修改handler状态为RequestStatus.FINISHED。 好像此FlushSuccessEvent没有继续向SchemaOperator继续传递其实不然SchemaOperator是不断向SchemaRegistry发送请求的SchemaOperator#requestSchemaChangeResult。 而SchemaRegistry是根据handler状态判断返回值类型的 SchemaRegistryRequestHandler#getSchemaChangeResult此时handler状态已经是RequestStatus.FINISHEDSchemaRegistry就会给CompletableFuture填充非SchemaChangeProcessingResponse了SchemaOperator类就中断阻塞继续向下游发送数据了。 /** * Apply the schema change to the external system. * * param tableId the table need to change schema * param derivedSchemaChangeEvents list of the schema changes */private void applySchemaChange( TableId tableId, ListSchemaChangeEvent derivedSchemaChangeEvents) { for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { if (changeEvent.getType() ! SchemaChangeEventType.CREATE_TABLE) { if (schemaChangeBehavior SchemaChangeBehavior.IGNORE) { currentIgnoredSchemaChanges.add(changeEvent); continue; } } if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) { LOG.info(Ignored schema change {} to table {}., changeEvent, tableId); currentIgnoredSchemaChanges.add(changeEvent); } else { try { metadataApplier.applySchemaChange(changeEvent); LOG.info(Applied schema change {} to table {}., changeEvent, tableId); schemaManager.applyEvolvedSchemaChange(changeEvent); currentFinishedSchemaChanges.add(changeEvent); } catch (Throwable t) { LOG.error( Failed to apply schema change {} to table {}. Caused by: {}, changeEvent, tableId, t); if (!shouldIgnoreException(t)) { currentChangeException t; break; } else { LOG.warn( Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}, changeEvent, t); } } } } Preconditions.checkState( schemaChangeStatus RequestStatus.APPLYING, Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not schemaChangeStatus); schemaChangeStatus RequestStatus.FINISHED; LOG.info( SchemaChangeStatus switched from APPLYING to FINISHED for request {}., currentDerivedSchemaChangeEvents);
}SchemaRegistryRequestHandler.RequestStatus类是就handler类状态的类型。具体状态流程可见文档。
// Schema change event state could transfer in the following way:
//
// -------- B --------
// | |
// v |
// -------- ---------------------
// | IDLE | --- A -- | WAITING_FOR_FLUSH |
// -------- ---------------------
// ^ |
// E C
// \ v
// ------------ ------------
// | FINISHED | -- D -- | APPLYING |
// ------------ ------------
//
// A: When a request came to an idling request handler.
// B: When current request is duplicate or ignored by LENIENT / routed table merging
// strategies.
// C: When schema registry collected enough flush success events, and actually started to apply
// schema changes.
// D: When schema change application finishes (successfully or with exceptions)
// E: When current schema change request result has been retrieved by SchemaOperator, and ready
// for the next request.
private enum RequestStatus { IDLE, WAITING_FOR_FLUSH, APPLYING, FINISHED
}接下来看下Sink端的事件处理
DataSinkWriterOperator
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator#processElement方法
重点是对FlushEvent的处理 Override
public void processElement(StreamRecordEvent element) throws Exception { Event event element.getValue(); // FlushEvent triggers flush if (event instanceof FlushEvent) { handleFlushEvent(((FlushEvent) event)); return; } // CreateTableEvent marks the table as processed directly if (event instanceof CreateTableEvent) { processedTableIds.add(((CreateTableEvent) event).tableId()); this.OneInputStreamOperatorEvent, CommittableMessageCommTgetFlinkWriterOperator() .processElement(element); return; } // Check if the table is processed before emitting all other events, because we have to make // sure that sink have a view of the full schema before processing any change events, // including schema changes. ChangeEvent changeEvent (ChangeEvent) event; if (!processedTableIds.contains(changeEvent.tableId())) { emitLatestSchema(changeEvent.tableId()); processedTableIds.add(changeEvent.tableId()); } processedTableIds.add(changeEvent.tableId()); this.OneInputStreamOperatorEvent, CommittableMessageCommTgetFlinkWriterOperator() .processElement(element);
}handleFlushEvent方法内只有两个操作
flush 将目前已经接受到所有数据写入目标库相当于jdbc的commit操作。发送事件发送FlushSuccess。notifyFlushSuccess内容见类SchemaEvolutionClient
private void handleFlushEvent(FlushEvent event) throws Exception { copySinkWriter.flush(false); // (8) schemaEvolutionClient.notifyFlushSuccess( getRuntimeContext().getIndexOfThisSubtask(), event.getTableId()); // (9)
}SchemaEvolutionClient
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient
org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient#notifyFlushSuccess方法 发送了FlushSuccessEvent事件至SchemaRegistry类的handleEventFromOperator方法。
public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException { toCoordinator.sendOperatorEventToCoordinator( schemaOperatorID, new SerializedValue(new FlushSuccessEvent(subtask, tableId)));
}TaskOperatorEventGateway
org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway SchemaOperator和DataSinkWriterOperator中的toCoordinator都是此类对象。
/*
Gateway to send an OperatorEvent or CoordinationRequest from a Task to the OperatorCoordinator JobManager side.
This is the first step in the chain of sending Operator Events and Requests from Operator to Coordinator. Each layer adds further context, so that the inner layers do not need to know about the complete context, which keeps dependencies small and makes testing easier.OperatorEventGateway takes the event, enriches the event with the OperatorID, and forwards it to:
TaskOperatorEventGateway enriches the event with the ExecutionAttemptID and forwards it to the:
JobMasterOperatorEventGateway which is RPC interface from the TaskManager to the JobManager.
*/
public interface TaskOperatorEventGateway { /** * Sends an event from the operator (identified by the given operator ID) to the operator * coordinator (identified by the same ID). */ void sendOperatorEventToCoordinator(OperatorID operator, SerializedValueOperatorEvent event); /** * Sends a request from current operator to a specified operator coordinator which is identified * by the given operator ID and return the response. */ CompletableFutureCoordinationResponse sendRequestToCoordinator( OperatorID operator, SerializedValueCoordinationRequest request);
}MetadataApplier
org.apache.flink.cdc.common.sink.MetadataApplier 此类负责将表结构修改的事件转化成为DDL发送给目标sink端数据库执行。
/** {code MetadataApplier} is used to apply metadata changes to external systems. */
PublicEvolving
public interface MetadataApplier extends Serializable { /** Apply the given {link SchemaChangeEvent} to external systems. */ void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException; // (10) /** Sets enabled schema evolution event types of current metadata applier. */ default MetadataApplier setAcceptedSchemaEvolutionTypes( SetSchemaChangeEventType schemaEvolutionTypes) { return this; } /** Checks if this metadata applier should this event type. */ default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { return true; } /** Checks what kind of schema change events downstream can handle. */ default SetSchemaChangeEventType getSupportedSchemaEvolutionTypes() { return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); }
}DorisMetadataApplier
org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier 实现 MetadataApplier
org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier#applySchemaChange 以
// (10)
Override
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException { try { // send schema change op to doris if (event instanceof CreateTableEvent) { applyCreateTableEvent((CreateTableEvent) event); } else if (event instanceof AddColumnEvent) { applyAddColumnEvent((AddColumnEvent) event); } else if (event instanceof DropColumnEvent) { applyDropColumnEvent((DropColumnEvent) event); } else if (event instanceof RenameColumnEvent) { applyRenameColumnEvent((RenameColumnEvent) event); } else if (event instanceof AlterColumnTypeEvent) { applyAlterColumnTypeEvent((AlterColumnTypeEvent) event); } else { throw new UnsupportedSchemaChangeEventException(event); } } catch (Exception ex) { throw new SchemaEvolveException(event, ex.getMessage(), null); }
}applyAddColumnEvent举例说明 这里仅做一些转换
private void applyAddColumnEvent(AddColumnEvent event) throws IOException, IllegalArgumentException { TableId tableId event.tableId(); ListAddColumnEvent.ColumnWithPosition addedColumns event.getAddedColumns(); for (AddColumnEvent.ColumnWithPosition col : addedColumns) { Column column col.getAddColumn(); FieldSchema addFieldSchema new FieldSchema( column.getName(), buildTypeString(column.getType()), column.getDefaultValueExpression(), column.getComment()); schemaChangeManager.addColumn( tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); }
}SchemaChangeManager
org.apache.doris.flink.sink.schema.SchemaChangeManager
org.apache.doris.flink.sink.schema.SchemaChangeManager#addColumn方法 SchemaChangeHelper是拼接SQL用的。schemaChange方法向数据库发送需要执行的SQL。
public boolean addColumn(String database, String table, FieldSchema field) throws IOException, IllegalArgumentException { if (checkColumnExists(database, table, field.getName())) { LOG.warn( The column {} already exists in table {}, no need to add it again, field.getName(), table); return true; } String tableIdentifier getTableIdentifier(database, table); String addColumnDDL SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field); return schemaChange( database, table, buildRequestParam(false, field.getName()), addColumnDDL);
}SchemaChangeHelper
org.apache.doris.flink.sink.schema.SchemaChangeHelper
org.apache.doris.flink.sink.schema.SchemaChangeHelper#buildAddColumnDDL 用ADD_DDL字符串模板拼接SQL
// (11)
private static final String ADD_DDL ALTER TABLE %s ADD COLUMN %s %s;
public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema) { String name fieldSchema.getName(); String type fieldSchema.getTypeString(); String defaultValue fieldSchema.getDefaultValue(); String comment fieldSchema.getComment(); StringBuilder addDDL new StringBuilder( String.format( ADD_DDL, DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), DorisSchemaFactory.identifier(name), type)); if (defaultValue ! null) { addDDL.append( DEFAULT ).append(DorisSchemaFactory.quoteDefaultValue(defaultValue)); } commentColumn(addDDL, comment); return addDDL.toString();
}流程总结
SchemaOperator接收到SchemaChangeEvent发送SchemaChangeRequest至SchemaRegistry。SchemaRegistry内部执行器是SchemaRegistryRequestHandler简称handlerhandler内部持有有状态schemaChangeStatus其判断是否正在执行之前的Request如果是则返回busy状态。如果不是则返回accept状态。其状态修改由RequestStatus.IDLE为RequestStatus.WAITING_FOR_FLUSH。SchemaOperator如果收到busy状态则sleep后再次发起请求阻塞直到收到accept状态则发送一条FlushEvent至下游之后发送SchemaChangeResultRequest至SchemaRegistry,等待返回结果如果是SchemaChangeProcessingResponse则认为SchemaChange还没有结束sleep后再次发起请求阻塞直至收到非SchemaChangeProcessingResponse。此时阻塞不再发送新的表结构的数据至下游。SchemaRegistry收到SchemaChangeResultRequest,handler会检查自身状态schemaChangeStatus如果不是RequestStatus.FINISHED则返回SchemaChangeProcessingResponse。DataSinkWriterOperator收到FlushEvent并执行flush操作将所有已经收到的老表结构的数据写入数据库。并发送FlushSuccessEvent给SchemaRegistry。SchemaRegistry的handler收集FlushSuccessEvent当收到所有的subtask的FlushSuccessEvent后修改自身状态为RequestStatus.APPLYING。后使用MetadataApplier执行sink端外数据库的表结构变更。执行后修改自身状态为RequestStatus.FINISHED。当SchemaOperator再次发送SchemaChangeResultRequest且SchemaRegistry的handler的状态为RequestStatus.FINISHEDSchemaRegistry返回给其结果为 非SchemaChangeProcessingResponseSchemaOperator将不再阻塞开始将新的表结构的数据继续发送至下游。
二、流程图 下图中的序号已经在源码中表示可以在源码中搜索。