免费空间asp网站,爱空间家装怎么样?两点告诉你,上海网站建设哪家企业,网上花店网站建设报错
截至目前3.0版本#xff0c;Debezium的Oracle Connector并不支持purge table这个指令。
所以#xff0c;在使用Debezium解析Oracle变更的时候#xff0c;如果在源端执行了类似 purge table $BIN… 的语句#xff0c;就会导致Debezium罢工#xff0c;日志里显…报错
截至目前3.0版本Debezium的Oracle Connector并不支持purge table这个指令。
所以在使用Debezium解析Oracle变更的时候如果在源端执行了类似 purge table $BIN… 的语句就会导致Debezium罢工日志里显示
Mining session stopped due to error.io.debezium.text.ParsingException: DDL statement couldnt be parsed. Please open a Jira issue with the statement purge table BIN$rvZfTNVaRv3dgTgQzBLuw$0;
mismatched input table expecting {EOF, /, ;}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:76) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:69) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:104) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:388) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleSchemaChange(AbstractLogMinerEventProcessor.java:1016) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:514) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:439) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:288) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:243) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:62) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:324) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:203) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[na:na]at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.antlr.v4.runtime.InputMismatchException: nullat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2143) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]... 21 common frames omitted而这个错误的处理方式如下
io.debezium.pipeline.ErrorHandler : Producer failureio.debezium.text.ParsingException: DDL statement couldnt be parsed. Please open a Jira issue with the statement purge table BIN$rvZfTNVaRv3dgTgQzBLuw$0;
mismatched input table expecting {EOF, /, ;}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]
……其实就是整个流水线都停止无法工作。
这个报错一大堆还显示了一个
Please open a Jira issue with the statement purge table BIN$rvZfTNVaRv3dgTgQzBLuw$0;
mismatched input table expecting {EOF, /, ;}好像只能去项目主页去提issue然后坐等修复了。
忽略
但是如果我们确保这个指令我们不需要解析的话可以不可以直接忽略而不是停止呢
其实可以。即把选项schema.history.internal.skip.unparseable.ddl默认值为false开启就为true就可以了。
properties.setProperty(schema.history.internal.skip.unparseable.ddl, true);官方手册里面认为这个值需要我们关注确定可以忽略再使用。
A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue. The safe default is false. Skipping should be used only with care as it can lead to data loss or mangling when the binlog is being processed.源码
虽然忽略了问题但是我们也很好奇这个过程是如何发生的所以追一下源码吧
经过一番查找发现在debezium-oracle-connector包的源码文件OracleSchemaChangeEventEmitter.class里有解析schema的方法emitSchemaChangeEvent方法定义为
public void emitSchemaChangeEvent(SchemaChangeEventEmitter.Receiver receiver) throws InterruptedException { Table tableBefore this.schema.tableFor(this.tableId); OracleDdlParser parser this.schema.getDdlParser(); DdlChanges ddlChanges parser.getDdlChanges(); try { ddlChanges.reset(); parser.setCurrentDatabase(this.sourceDatabaseName); parser.setCurrentSchema(this.objectOwner); parser.parse(this.ddlText, this.schema.getTables()); } catch (MultipleParsingExceptions | ParsingException e) { if (!this.schema.skipUnparseableDdlStatements()) { throw e; } LOGGER.warn(Ignoring unparsable DDL statement {}:, this.ddlText, e); this.streamingMetrics.incrementWarningCount(); this.streamingMetrics.incrementSchemaChangeParseErrorCount(); }可以看到如果this.schema.skipUnparseableDdlStatements()为真就只会打印一条警告日志继续执行不会抛出异常。
而这个skipUnparseableDdlStatements的定义在debezium-core的HistorizedRelationalDatabaseSchema.class文件里
public boolean skipUnparseableDdlStatements() { return this.historizedConnectorConfig.skipUnparseableDdlStatements();
}可以看到只是返回了historizedConnectorConfig的同名方法。而historizedConnectorConfig则是一个HistorizedRelationalDatabaseConnectorConfig。
跟进这个文件去可以看到这个方法只是一个变量的返回
public boolean skipUnparseableDdlStatements() { return this.skipUnparseableDDL;
}而变量skipUnparseableDDL则在构造函数里进行了设定
protected HistorizedRelationalDatabaseConnectorConfig(Class? extends SourceConnector connectorClass, Configuration config, Tables.TableFilter systemTablesFilter, Selectors.TableIdToStringMapper tableIdMapper, boolean useCatalogBeforeSchema, int defaultSnapshotFetchSize, ColumnFilterMode columnFilterMode, boolean multiPartitionMode) { super(config, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema); this.useCatalogBeforeSchema useCatalogBeforeSchema; this.connectorClass connectorClass; this.multiPartitionMode multiPartitionMode; this.ddlFilter this.createDdlFilter(config); this.skipUnparseableDDL config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); this.storeOnlyCapturedTablesDdl config.getBoolean(STORE_ONLY_CAPTURED_TABLES_DDL); this.storeOnlyCapturedDatabasesDdl config.getBoolean(STORE_ONLY_CAPTURED_DATABASES_DDL);
}来自与config的SKIP_UNPARSEABLE_DDL_STATEMENTS参数的boolean值。
而SKIP_UNPARSEABLE_DDL_STATEMENTS的定义在这个类里面是一个静态初始化过程
static { SCHEMA_HISTORY Field.create(schema.history.internal).withDisplayName(Database schema history class).withType(Type.CLASS).withWidth(Width.LONG).withImportance(Importance.LOW).withInvisibleRecommender().withDescription(The name of the SchemaHistory class that should be used to store and recover database schema changes. The configuration properties for the history are prefixed with the schema.history.internal. string.).withDefault(io.debezium.storage.kafka.history.KafkaSchemaHistory); SKIP_UNPARSEABLE_DDL_STATEMENTS SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; STORE_ONLY_CAPTURED_TABLES_DDL SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; STORE_ONLY_CAPTURED_DATABASES_DDL SchemaHistory.STORE_ONLY_CAPTURED_DATABASES_DDL; CONFIG_DEFINITION RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit().history(new Field[]{SCHEMA_HISTORY, SKIP_UNPARSEABLE_DDL_STATEMENTS, STORE_ONLY_CAPTURED_TABLES_DDL, STORE_ONLY_CAPTURED_DATABASES_DDL}).create();
}其实就是SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS其中SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS的定义也在这个类的构造函数里是
public interface SchemaHistory { String CONFIGURATION_FIELD_PREFIX_STRING schema.history.internal.; Field NAME Field.create(schema.history.internal.name).withDisplayName(Logical name for the database schema history).withType(Type.STRING).withWidth(Width.MEDIUM).withImportance(Importance.LOW).withDescription(The name used for the database schema history, perhaps differently by each implementation.).withValidation(new Field.Validator[]{Field::isOptional}); Field SKIP_UNPARSEABLE_DDL_STATEMENTS Field.create(schema.history.internal.skip.unparseable.ddl).withDisplayName(Skip DDL statements that cannot be parsed).withType(Type.BOOLEAN).withWidth(Width.SHORT).withImportance(Importance.LOW).withDescription(Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.).withDefault(false);
……这一趟追下来不得不说这些参数真是包装隐藏得博大精深