网站开发mvc架构,昆山网站建设哪家便宜,建设网站教程视频下载,中国建设工程有限公司文章目录 一. RecordWriter封装数据并发送到网络1. 数据发送到网络的具体流程2. 源码层面2.1. Serializer的实现逻辑a. SpanningRecordSerializer的实现b. SpanningRecordSerializer中如何对数据元素进行序列化 2.2. 将ByteBuffer中间数据写入BufferBuilder 二. BufferBuilder申… 文章目录 一. RecordWriter封装数据并发送到网络1. 数据发送到网络的具体流程2. 源码层面2.1. Serializer的实现逻辑a. SpanningRecordSerializer的实现b. SpanningRecordSerializer中如何对数据元素进行序列化 2.2. 将ByteBuffer中间数据写入BufferBuilder 二. BufferBuilder申请资源并创建1. ChannelSelectorRecordWriter创建BufferBuilder2. BroadcastRecordWriter创建BufferBuilder 一. RecordWriter封装数据并发送到网络
1. 数据发送到网络的具体流程
RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程整个过程细节如下。 StreamRecord通过RecordWriterOutput写入RecordWriter并在RecordWriter中通过RecordSerializer组件将StreamRecord序列化为ByteBuffer数据格式。 RecordWriter向ResultPartition申请BufferBuilder对象用于构建BufferConsumer对象将序列化后的二进制数据存储在申请到的Buffer中。ResultPartition会向LocalBufferPool申请MemorySegment内存块用于存储Buffer数据。 BufferBuilder中会不断接入ByteBuffer数据直到将BufferBuilder中的Buffer空间占满此时会申请新的BufferBuilder继续构建BufferConsumer数据集。 Buffer构建完成后会调用flushTargetPartition()方法让ResultPartition向下游输出数据此时会通知NetworkSequenceViewReader组件开始消费ResultSubPartition中的BufferConsumer对象。 当BufferConsumer中Buffer数据被推送到网络后回收BufferConsumer中的MemorySegment内存空间继续用于后续的消息处理。 2. 源码层面
接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。 通过recordWriter.emit(serializationDelegate)方法将数据元素发送到RecordWriter中进行处理。主要逻辑如下 序列化数据为ByteBuffer二进制数据并缓存在SpanningRecordSerializer.serializationBuffer对象中。将序列化器生成的中间数据复制到指定分区中实际上就是将ByteBuffer数据复制到BufferBuiler对象中。如果BufferBuiler中存储了完整的数据元素就会清空序列化器的中间数据因为序列化器中累积的数据不宜过大。 protected void emit(T record, int targetSubpartition) throws IOException { checkErroneous(); targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); if (flushAlways) { targetPartition.flush(targetSubpartition); }
}protected void emit(T record, int targetChannel) throws IOException, InterruptedException {checkErroneous();// 数据序列化serializer.serializeRecord(record);// 将序列化器中的数据复制到指定分区中if (copyFromSerializerToTargetChannel(targetChannel)) {// 清空序列化器serializer.prune();}
}2.1. Serializer的实现逻辑
接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中最终将数据发送到下游。
a. SpanningRecordSerializer的实现
SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。 SpanningRecordSerializer对象主要包含了DataOutputSerializer serializationBuffer和ByteBuffer dataBuffer两个成员变量。 DataOutputSerializer可以将数据转换成二进制格式并存储在byte[]数组中。在serialization中会调用serializationBuffer.wrapAsByteBuffer()方法将serializationBuffer中生成的byte[]数组转换成ByteBuffer数据结构并赋值给dataBuffer对象。ByteBuffer是Java NIO中用于对二进制数据进行操作的Buffer接口底层有DirectByteBuffer和HeapByteBuffer等实现通过ByteBuffer提供的方法可以轻松实现对二进制数据的操作。 b. SpanningRecordSerializer中如何对数据元素进行序列化
SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。 1清理serializationBuffer的中间数据实际上就是将byte[]数组的position参数置为0。 2设定serialization buffer的初始容量默认不小于4。 3将数据元素写入serializationBuffer的bytes[]数组。所有数据元素都实现了IOReadableWritable接口可以直接将数据对象转换为二进制格式 4获取serializationBuffer的长度信息并写入serializationBuffer。 5将serializationBuffer中的byte[]数据封装为java.io.ByteBuffer数据结构最终赋值到dataBuffer的中间结果中。 public void serializeRecord(T record) throws IOException {if (CHECKED) {if (dataBuffer.hasRemaining()) {throw new IllegalStateException(Pending serialization of previous record.);}}// 首先清理serializationBuffer中的数据serializationBuffer.clear();// 设定serialization buffer数量serializationBuffer.skipBytesToWrite(4);// 将record数据写入serializationBufferrecord.write(serializationBuffer);// 获取serializationBuffer的长度信息并记录到serializationBuffer对象中int len serializationBuffer.length() - 4;serializationBuffer.setPosition(0);serializationBuffer.writeInt(len);serializationBuffer.skipBytesToWrite(len);// 对serializationBuffer进行wrapp处理转换成ByteBuffer数据结构dataBuffer serializationBuffer.wrapAsByteBuffer();
}Flink 1.12版本中RecordWriter就提供了serializeRecord的能力没有单拎出来实现。 2.2. 将ByteBuffer中间数据写入BufferBuilder
首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑 对序列化器进行Reset操作重置初始化位置。将序列化器的ByteBuffer中间数据写入BufferBuilder。判断当前BufferBuilder是否构建了完整的Buffer数据完成BufferBuilder中Buffer的构建。判断SerializationResult中是否具有完整的数据元素如果是则将pruneTriggered置为True然后清空当前的BufferBuilder并跳出循环。创建新的bufferBuilder继续从序列化器中将中间数据复制到BufferBuilder中。指定flushAlways参数为True调用flushTargetPartition()方法将数据写入ResultPartition。为防止过度频繁地将数据写入ResultPartiton在RecordWriter中会有独立的outputFlusher线程在构造器中周期性地将构建出来的Buffer数据推送到ResultPartiton本地队列中存储默认延迟为100ms。 protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {// 对序列化器进行Reset操作初始化initial positionserializer.reset();// 创建BufferBuilderboolean pruneTriggered false;BufferBuilder bufferBuilder getBufferBuilder(targetChannel);// 调用序列化器将数据写入bufferBuilderSerializationResult result serializer.copyToBufferBuilder(bufferBuilder);// 如果SerializationResult是完整Bufferwhile (result.isFullBuffer()) {// 则完成创建Buffer数据的操作finishBufferBuilder(bufferBuilder);// 如果是完整记录则将pruneTriggered置为Trueif (result.isFullRecord()) {pruneTriggered true;emptyCurrentBufferBuilder(targetChannel);break;}// 创建新的bufferBuilder继续复制序列化器中的数据到BufferBuilder中bufferBuilder requestNewBufferBuilder(targetChannel);result serializer.copyToBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), All data should be written at once);// 如果指定的flushAlways则直接调用flushTargetPartition将数据写入ResultPartitionif (flushAlways) {flushTargetPartition(targetChannel);}return pruneTriggered;
}二. BufferBuilder申请资源并创建
1. ChannelSelectorRecordWriter创建BufferBuilder
在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。
//1. targetChannel确认数据写入的分区ID与下游InputGate中的InputChannelID是对应的
//2.
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {//在ChannelSelectorRecordWriter中维护了//bufferBuilders[]数组用于存储创建好的BufferBuilder对象if (bufferBuilders[targetChannel] ! null) {return bufferBuilders[targetChannel];} else {//只有在无法从bufferBuilders[]中获取BufferBuilder时//才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。return requestNewBufferBuilder(targetChannel);}
}requestNewBufferBuilder()方法逻辑如下 检查bufferBuilders[]的状态确保bufferBuilders[targetChannel]为空或者bufferBuilders[targetChannel].isFinished()方法返回值为True。调用targetPartition.getBufferBuilder()方法获取新的BufferBuilder这里的targetPartition就是前面提到的ResultPartition。在ResultPartition中会向LocalBufferPool申请Buffer内存空间用于存储序列化后的ByteBuffer数据。向targetPartition添加通过bufferBuilder构建的BufferConsumer对象bufferBuilder和BufferConsumer内部维护了同一个Buffer数据。BufferConsumer会被存储到ResultSubpartition的BufferConsumer队列中。将创建好的bufferBuilder添加至数组用于下次直接获取和构建BufferConsumer对象。 public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilders[targetChannel] null || bufferBuilders[targetChannel].isFinished());// 调用targetPartition获取BufferBuilderBufferBuilder bufferBuilder targetPartition.getBufferBuilder();// 向targetPartition中添加BufferConsumertargetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),targetChannel);// 将创建好的bufferBuilder添加至数组bufferBuilders[targetChannel] bufferBuilder;return bufferBuilder;
}2. BroadcastRecordWriter创建BufferBuilder
在BroadcastRecordWriter内部创建BufferBuilder的过程中会将创建的bufferConsumer对象添加到所有的ResultSubPartition中实现将Buffer数据下发至所有InputChannel如下代码
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilder null || bufferBuilder.isFinished());BufferBuilder builder targetPartition.getBufferBuilder();if (randomTriggered) {targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);} else {try (BufferConsumer bufferConsumer builder.createBufferConsumer()) {for (int channel 0; channel numberOfChannels; channel) {targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);}}}bufferBuilder builder;return builder;
}以上步骤就是在RecordWriter组件中将数据元素序列化成二进制格式然后通过BufferBuilder构建成Buffer类型数据最终存储在ResultPartition的ResultSubPartition中。 这是从Task的层面了解数据网络传输过程下篇了解在TaskManager中如何构建底层的网络传输通道。