Deal with Mina's package sticking and package breaking (with complete instance, client and server)

1. What is broken package and sticky package?

Before talking about breaking and sticking packets, we first talk about message protection boundary and no message protection boundary.
1. Protected message boundary means that the transmission protocol transmits data as an independent message on the Internet, and the receiver can only receive independent messages. That is to say, there is a protected message boundary, and the receiver can only receive one packet sent by the sender at a time
2. Flow oriented has no message protection boundary. If the sender sends data continuously, the receiver may receive two or more packets in one receive operation.

While tcp is flow oriented, it needs to deal with the message boundary at the receiving end.

The receiver may encounter the following four situations when receiving data

A. Receive dataA first and then dataB
B. First receive part of data a, then the rest of data a and all of data B
C. First received all data of dataA and part of data of dataB, then received the rest of data of dataB
D. All data of dataA and dataB are received at one time

A is normal, no sticking or broken package.
B is broken package + sticking package.
C is sticking bag + broken bag.
D is adhesive package.

2. How to deal with the problems of package sticking and package breaking in Mina

In the Mina framework, there is a cumulative protocol decoder, which is used to deal with packet sticking and packet breaking. The return value of doDecode() is important.

A. When your doDecode() method returns true, the CumulativeProtocolDecoder's decode() method will first determine whether you have read data from the internal IoBuffer buffer in the doDecode() method. If not, an illegal status exception will be thrown. That is to say, if your doDecode() method returns true, you have consumed this data (equivalent to a complete one in the chat room The message has been read), which means that you must have consumed the data of the internal IoBuffer buffer (even one byte of data). If the validation is passed, the cumulative protocol decoder will check whether there is any data in the buffer that has not been read. If there is, continue to call the doDecode() method, and if not, stop calling the doDecode() method until new data is buffered.

B. When your doDecode() method returns false, the CumulativeProtocolDecoder will stop calling the doDecode() method. However, if the data has not been read this time, the IoBuffer buffer with the remaining data will be saved to the IoSession, so that the next time the data arrives, it can be extracted and merged from the IoSession. If it is found that all the data has been read this time, clear the IoBuffer buffer buffer (let the parent class receive the next package). In short, when you think the read data is enough to decode, return true, otherwise return false. In fact, the most important work of this cumulative protocol decoder is to help you complete the data accumulation, because this work is very cumbersome. In other words, return true, the cumulative protocol decoder will call the decoder again and send the remaining data (that is, it will send the remaining data to doDecode() for processing, and the remaining data is the data of retaining()), return false and don't process the remaining data, (don't send the remaining data to doDecode()) when there are new data packets coming, the remaining data and new data will be processed And then we call the decoder.

A complete example is attached below
1. Message format
Package header + message length (int) + message content (JSON string) + package tail. The package header and package tail are hexadecimal strings 00 aa bb cc, which are converted into byte array 0, - 86, - 69, - 52. The following complete example has the client and the server. The data will be parsed, the message content (JSON string) will be obtained and printed. The message is in the server in the form of byte array , passed between clients.

Server code

package com.my.mina;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
 * mina Service side of
 * 
 * @author linbin
 *
 */
public class MinaService {

    public static void main(String[] args) {

        // Create a non blocking server Socket
        IoAcceptor acceptor = new NioSocketAcceptor();
        // Add log filter
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));// Custom de encoder

        // Set Handler
        acceptor.setHandler(new DemoServerHandler());
        // Set cache size for read data
        acceptor.getSessionConfig().setReadBufferSize(2048);
        // Read / write channel enters idle state without operation within 10 seconds
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        try {
            // Binding port
            acceptor.bind(new InetSocketAddress(20000));
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Startup service");
    }

    /**
     * @ClassName: DemoServerHandler
     * @Description: Responsible for creating and listening session objects and messages
     * @author chenzheng
     * @date 2016-12-9 3:57:11 PM
     */
    private static class DemoServerHandler extends IoHandlerAdapter {

        // Server and client create connection
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println("Server and client create connection...");
            super.sessionCreated(session);
        }

        @Override
        public void sessionOpened(IoSession session) throws Exception {
            System.out.println("Server client connection open...");
            super.sessionOpened(session);
        }

        // Receiving and processing of messages
        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            // TODO Auto-generated method stub
            super.messageReceived(session, message);// Acceptance of messages

            // Pass custom decoders pass arrays and parse arrays lose packets and break packets
            String a = (String) message;
            System.out.println("Data received:" + a);
            session.write(a);

        }

        // Call after sending message
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            // TODO Auto-generated method stub
            super.messageSent(session, message);
            System.out.println("Server successfully sent message...");
        }

        // session close
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            // TODO Auto-generated method stub
            super.sessionClosed(session);
            System.out.println("Disconnect:");
        }
    }

}

Encoder


package com.my.mina;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import java.nio.charset.Charset;

/**
 * Encoder
 * 
 */
public class ByteArrayEncoder extends ProtocolEncoderAdapter {

    private final Charset charset;

    public ByteArrayEncoder(Charset charset) {
        this.charset = charset;

    }

    /**
     * Send the data directly, data format, packet header + message length (int) + message content (json string) + packet tail, packet header and packet tail are hexadecimal strings 00 aa bb cc, which are converted into byte array 0,
     * -86, -69, -52 Four bytes
     *
     * @param session
     * @param message
     * @param out
     * @throws Exception
     */
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        // Imitate project, solve the problem of breaking package and sticking package
        String value = (message == null ? "" : message.toString());// Message value
        byte[] content = value.getBytes(charset);// Message content, byte array
        IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);// Buffer size 38 bytes plus character length
        buf.put(new byte[] { 0, -86, -69, -52 });// Fixed value at the beginning of the input package hex 00 aa bb cc, converted to byte array
        buf.putUnsignedInt(content.length);// int is 4 bytes, one byte is equal to 2 hexadecimal characters, so it has eight bits 00 00 0C and content length.
        buf.put(content);// Message content
        buf.put(new byte[] { 0, -86, -69, -52 });// Packet tail
        buf.flip();
        out.write(buf);// Write in
    }

}


Decoder, focus, solve the problem of Mina packet breaking and packet loss

package com.my.mina;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import java.nio.charset.Charset;

/**
 * Custom decoder to ensure complete package read
 */
public class ByteArrayDecoder extends CumulativeProtocolDecoder {

    private final Charset charset;

    public ByteArrayDecoder(Charset charset) {
        this.charset = charset;

    }

    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)
            throws Exception {
        // Packet loss processing
        if (ioBuffer.remaining() > 4)// Baotou is enough
        {
            ioBuffer.mark();// Mark the snapshot mark of the current position, so that subsequent reset operations can recover the position, starting with 0
            byte[] l = new byte[4];
            ioBuffer.get(l);// Read package header, 4 bytes
            if (ioBuffer.remaining() < 4)// Insufficient 4 bytes of content length, packet break
            {
                ioBuffer.reset();
                return false;//
            } else {// 4 byte array of content length is enough
                byte[] bytesLegth = new byte[4];// Content length
                ioBuffer.get(bytesLegth);// Read content length, int type, four bytes
                int len = MinaUtil.byteArrayToInt(bytesLegth);// How long is the content
                if (ioBuffer.remaining() < len)// Insufficient content, broken package
                {
                    ioBuffer.reset();
                    return false;//

                } else { // Enough message content

                    byte[] bytes = new byte[len];
                    ioBuffer.get(bytes, 0, len);
                    protocolDecoderOutput.write(new String(bytes, charset));// Read content and send

                    if (ioBuffer.remaining() < 4) {// Insufficient package
                        ioBuffer.reset();
                        return false;//

                    } else {// Enough package end
                        byte[] tails = new byte[4];
                        ioBuffer.get(tails);// Read packet end
                        if (ioBuffer.remaining() > 0)// Finally, if the package is stuck, the doDeocde() method will be called again to process the remaining data to the doDeocde() method
                        {
                            return true;
                        }

                    }
                }

            }

        }
        return false;// Break the package, or finish executing,

    }
}

Decode factory

package com.my.mina;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

import java.nio.charset.Charset;

/**
 * Custom encoder factory
 *
 */

public class ByteArrayCodecFactory implements ProtocolCodecFactory {

    private ByteArrayDecoder decoder;
    private ByteArrayEncoder encoder;

    public ByteArrayCodecFactory() {
        this(Charset.defaultCharset());
    }

    public ByteArrayCodecFactory(Charset charSet) {
        encoder = new ByteArrayEncoder(charSet);
        decoder = new ByteArrayDecoder(charSet);
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

}

Note: the client and server need the same decoder, encoder and decomposing factory as the server.

Client core code


package com.example.mina.minaapplication.view;

import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;

import com.example.mina.minaapplication.R;
import com.example.mina.minaapplication.mina.ByteArrayCodecFactory;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * Mina Client
 */
public class MainActivity extends Activity {
    /**
     * Thread pool, to avoid blocking the main thread, establish a connection with the server, create a single thread pool, and execute the thread pool as soon as possible
     */
    private static ExecutorService executorService = Executors.newSingleThreadExecutor();


    /**
     * Connection object
     */
    private NioSocketConnector mConnection;
    /**
     * session object
     */
    private IoSession mSession;
    /**
     * Address to connect to the server
     */
    private InetSocketAddress mAddress;

    private ConnectFuture mConnectFuture;


    public static final int UPADTE_TEXT = 1;
    /**
     * Information returned by the server
     */
    private TextView tvShow;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        tvShow = findViewById(R.id.tv_show);
        initConfig();
        connect();
        findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {//Send message data


            @Override
            public void onClick(View view) {
                if (mConnectFuture != null && mConnectFuture.isConnected()) {//Connect to the server
                    mConnectFuture.getSession().write("{\"id\":11,\"name\":\"ccc\"}");//Send json string
                }

            }
        });
    }

    /**
     * Initialize Mina configuration information
     */
    private void initConfig() {
        mAddress = new InetSocketAddress("192.168.0.1", 20000);//Connection address. This data can be changed to the IP and port number you want to connect
        mConnection = new NioSocketConnector();// Create connection
        // Set cache size for read data
        SocketSessionConfig socketSessionConfig = mConnection.getSessionConfig();
        socketSessionConfig.setReadBufferSize(2048);
        socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4);//Set 4 seconds for no read / write operation to idle
        mConnection.getFilterChain().addLast("logging", new LoggingFilter());//logging filter
        mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));//Custom de encoder
        mConnection.setHandler(new DefaultHandler());//Set handler
        mConnection.setDefaultRemoteAddress(mAddress);//Set address


    }

    /**
     * Create connection
     */

    private void connect() {

        FutureTask<Void> futureTask = new FutureTask<>(new Callable<Void>() {
            @Override
            public Void call() {//

                try {
                    while (true) {
                        mConnectFuture = mConnection.connect();
                        mConnectFuture.awaitUninterruptibly();//Until he connects
                        mSession = mConnectFuture.getSession();//Get session object
                        if (mSession != null && mSession.isConnected()) {
                            Toast.makeText(MainActivity.this, "Successful connection", Toast.LENGTH_SHORT).show();
                            break;
                        }
                        Thread.sleep(3000);//Cycle every three seconds
                    }

                } catch (Exception e) {//Connection exception


                }
                return null;
            }
        });
        executorService.execute(futureTask);//Execute connection thread
    }


    /**
     * Mina handler for handling messages. Messages returned from the server are usually handled here
     */
    private class DefaultHandler extends IoHandlerAdapter {


        @Override
        public void sessionOpened(IoSession session) throws Exception {
            super.sessionOpened(session);

        }

        /**
         * Server side message received
         *
         * @param session
         * @param message
         * @throws Exception
         */
        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            Log.e("tag", "Server side message received:" + message.toString());

            Message message1 = new Message();
            message1.what = UPADTE_TEXT;
            message1.obj = message;
            handler.sendMessage(message1);
        }


        @Override
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {//Client enters idle state
            super.sessionIdle(session, status);

        }
    }

    /**
     * Update UI
     */
    private Handler handler = new Handler() {
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            switch (msg.what) {
                case UPADTE_TEXT:
                    String message = (String) msg.obj;
                    tvShow.setText(message);
                    break;
            }
        }
    };
}

Client screenshot:

Client screenshot

Server screenshot:

Server screenshot

The complete project code address of this article (welcome to star):
https://github.com/lb1207087645/Android-Mina-master

Reference resources:

Discussion on sticking and breaking of TCP packets and Solutions

Receive and process byte array by mini custom codec (solve the problem of sticking and missing packets in data transmission at the same time)

Author: be a different programmer
Links: https://www.jianshu.com/p/9eb87f321eda
Source: Jianshu
The copyright belongs to the author. For commercial reprint, please contact the author for authorization. For non-commercial reprint, please indicate the source.

Tags: Programming Session Apache codec Java

Posted on Wed, 29 Jan 2020 00:38:55 -0800 by kamasheto