File Transfer vs Message Transfer and Framing Model
Posted by Yanchao MURONG on
2019-07-16
In the last post, I have illustrated a simple way to address sticky
& lost package during socket transfer, which works for small
messages. In this post, I would like to draw out attention to File
transfer, which shares a lot of things in common with normal message
transfer but also has its own particularities which could lead to new
problems.
Difference
between File Transfer and Message Transfer
The data size of a file could be far larger than normal message
...... Well, one is enough for us to be preoccupied since it is indeed
the most important difference that cannot be underestimated if we come
to see its problems.
As there is great amount of data to transfer, if we transfer them
all at once (let's say it takes 10 minutes to transfer all), the next
message cannot be sent until 10 minutes afterwards as the socket has
been fully occupied by this big file
If we get frustrated and want to cancel the transfer, it will become
worse as the server is waiting for the whole file being completely
transferred, and hence blocked the whole socket transfer.
Even if we only have one big file to transfer, with the solution
introduced in my last post, there will be a big amount of buffer being
used, which could obviously leads to out of memory if we are talking
about many gegabytes files.
If during the transfer, the network crash, then we dont know at all
which part has been trasferred and which part has not been transferred,
which forces us to retransfer the whole file again.
How could we address all of them in an elegant way? Framing come to
rescue.
What is Frame
A frame is a simple container for a single network packet. In our
case, it could simply be a fix sized container that could transfer a
part of the whole file or message with supplementary informations for us
to be able to recombine them during reception.
Imagine a following frame data structure:
1 2 3 4 5 6 7 8
<Frame: capacity(64KB)><{ Length: 2 Bytes [0~65535], // size of payload Type: 1 Byte [-128~127], // type id Flags: 1 Byte [00000000], // encryption id Identifier: 1 Byte[1~255], // parent packet id Other: 1 Byte [00000000], // other things Payload: X Bytes [X,X,X...]// payload data }>
In this data structure, we have a fixed header part which represents
6 bytes.
The Type attributes could be - TYPE_PACKET_HEADER It could be used as
the first frame for a big packet (file or message) to send, transporting
the information of the whole packet for later combination usage.
Typically, we could have packet length [5 bytes] and packet type[one
byte]. It is interesting to mention that through the type of packet, we
could then determine whether we are receving a file or a message.
TYPE_PACKET_ENTITY This will be the entity frame containing frame
header part and the frame entity part containing part of the binary data
(file or message).
TYPE_COMMAND_SEND_CANCEL Special type of frame which indicates that
a packet transfer has been canceled.
...
File Transfer Packet and
Framing
Sending Frame logic
Whenever we want to send a file, we could define a FileSendPacket
like the following. Note that we have used InputStream to enable
streaming to prevent unnecessary buffer.
publicbooleanrequestTakePacket() { synchronized (this) { if (nodeSize >= 1) { returntrue; } } SendPacketpacket= provider.takePacket(); if (packet != null) { // am identifier will be generated and put in each frame header at the 5th byte position shortidentifier= generateIdentifier(); SendHeaderFrameframe=newSendHeaderFrame(identifier, packet); // append the first sendHeaderFrame appendNewFrame(frame); } synchronized (this) { return nodeSize != 0; } }
public IoArgs fillData() { FramecurrentFrame= getCurrentFrame(); if (currentFrame == null) { returnnull; }
try { // if sendHeaderFrame, then put its header and body to args // if sendEntityFrame, then put its header and body from inputstream to args if (currentFrame.handle(args)) { // currentFrame has been consumed FramenextFrame= currentFrame.nextFrame(); // if headerFrame => nextFrame is entityFrame if not then nextFrame is still entityFrame if (nextFrame != null) { appendNewFrame(nextFrame); } elseif (currentFrame instanceof SendEntityFrame) { // the last sendEntityFrame provider.completedPacket(((SendEntityFrame) currentFrame).getPacket(), true); } // popCurrentFrame // if nodeSize == 0 then requestTakePacket popCurrentFrame(); } return args; } catch (IOException e) { e.printStackTrace(); }
returnnull; }
Then we would write the intermediate buffer to socket channel.
try { // writeTo operation to consume args if (args == null) { processor.onConsumeFailed(null, new IOException("ProvideIoArgs is null.")); } else { int count = args.writeTo(socketChannel); if (count == 0) { System.out.println("Current write zero data!"); }
The requestSend method will register for writable event again, once
it became again writable, it will once again call fillData() to
construct the next SendEntityFrame to fill data to the IoArgs until
there all packet has been separated to frames.
In SendEntityFrame we have a unConsumeEntityLength to mark the rest
packet length to read from the FileInputStream so that we know when to
stop building next frame.
@Override protectedintconsumeBody(IoArgs args)throws IOException { if (packet == null) { // current frame has been stopped, fill fake data return args.fillEmpty(bodyRemaining); } return args.readFrom(channel); }
As we receive from the socket channel, we just need to receive the
binary data frame by frame. By reading the frame type, we could know if
it is a header frame or entity frame.
public static AbsReceiveFrame createInstance(IoArgs args) { byte[] buffer = new byte[Frame.FRAME_HEADER_LENGTH]; args.writeTo(buffer, 0); byte type = buffer[2]; switch (type) { case Frame.TYPE_COMMAND_SEND_CANCEL: return newCancelReceiveFrame(buffer); case Frame.TYPE_PACKET_HEADER: return newReceiveHeaderFrame(buffer); case Frame.TYPE_PACKET_ENTITY: return newReceiveEntityFrame(buffer); default: throw newUnsupportedOperationException("unsupported frame type" + type); } } ... /** * this is frame header handling logic * build a new frame with help the frame header bytes (6 bytes) * * @param args * @return */ private Frame buildNewFrame(IoArgs args) { AbsReceiveFrame frame = ReceiveFrameFactory.createInstance(args); if (frame instanceof CancelReceiveFrame) { cancelReceivePacket(frame.getBodyIdentifier()); return null; } elseif (frame instanceof ReceiveEntityFrame) { WritableByteChannel channel = getPacketChannel(frame.getBodyIdentifier()); ((ReceiveEntityFrame) frame).bindPacketChannel(channel); } return frame; }
The first time we read a HeaderFrame, we could build a outputStream
and conserve it into a packetMap. Then when it comes to a EntityFrame,
we could refind the outputStream for given readable socket channel and
write to its corresponding outputStream.
if (frameTemp == null) { Frame temp; do { temp = buildNewFrame(args); } while (temp == null && args.remained());
if (temp == null) { return; }
frameTemp = temp; if (!args.remained()) { return; } }
Frame currentFrame = frameTemp; do { try { // read frame body // if header frame then write header content to header frame property // if entity frame then write to channel if (currentFrame.handle(args)) { if (currentFrame instanceof ReceiveHeaderFrame) { ReceiveHeaderFrame headerFrame = (ReceiveHeaderFrame) currentFrame; // build packet with header frame body information ReceivePacket packet = provider.takePacket(headerFrame.getPacketType(), headerFrame.getPacketLength(), headerFrame.getPacketHeaderInfo() ); // construct packetModel from packet and add it to packetMap appendNewPacket(headerFrame.getBodyIdentifier(), packet); } elseif (currentFrame instanceof ReceiveEntityFrame) { completeEntityFrame((ReceiveEntityFrame) currentFrame); } frameTemp = null; break; } } catch (IOException e) { e.printStackTrace(); } } while (args.remained()); ... private void appendNewPacket(shortidentifier, ReceivePacket packet) { PacketModel model = newPacketModel(packet); packetMap.put(identifier, model); } ... /** * a data model to manage packet and channel couple for later recovery */ static class PacketModel { final ReceivePacket packet; final WritableByteChannel channel; volatile long unreceivedlength;
PacketModel(ReceivePacket<?, ?> packet) { this.packet = packet; // packet.open() will create a outputStream via ReceivePacket this.channel = Channels.newChannel(packet.open()); this.unreceivedlength = packet.length(); } }
Conclusion
To sum up, with the help of framing. The server will be able to
distinguish different packets arriving in parallel. According to
different types of received frames, we could define different ways of
handling. Treating big volume of file transfer with small pieces of data
chunk is an effective way to address the problems we could face in a
long connection transfer.