C# Mqtt disconnected reconnection

After connecting through an MqttClient client, if the server-side service restarts, the client will no longer receive subscribed messages without a reconnection mechanism.

The Mqtt components used are: M2Mqtt.Net.dll

Some feature discoveries

(1) If the service-side address provided is not resolvable, an exception is raised that the MqttClient object cannot be instantiated.
(2) When Connect cannot connect, an exception is thrown and IsConnected is false.
(3) The disconnection of the server triggers the client's ConnectionionClosed event, with IsConnected false.
(4) Reconnecting requires Subscribe to re-subscribe to topics.
(5) The first parameter of MqttClient.Subscribe is the subscription topic array, and the second is the corresponding qosLevel. The two arrays must be the same length, otherwise they will be abnormal.

Reconnection process control

Main Code Implementation

(1) Thread body

// Auto reconnect body
private void _TryContinueConnect()
{
    if (IsConnected) return;

    Thread retryThread = new Thread(new ThreadStart(delegate
    {
        while (_MqttClient == null || !_MqttClient.IsConnected)
        {
            if (_ToClose) break;

            if (_MqttClient == null)
            {
                _BuildClient();
                Thread.Sleep(3000);
                continue;
            }

            try
            {
                _TryCount++;
                _Connect(); 
            }
            catch (Exception ce)
            {
                Debug.WriteLine("re connect exception:" + ce.Message);
            }

            // Sleep 2 seconds if no connection meets the end criteria
            if (!_MqttClient.IsConnected)
            {
                Thread.Sleep(2000);
            }
        }
    }));
            
    retryThread.Start();
}

(2) Instantiation section

// Instantiate Client
private void _BuildClient()
{
    try
    {
        _MqttClient = new MqttClient(_MqttServer);
    }
    catch (Exception e)
    {
        Debug.WriteLine("build client error:" + e.Message);
        return;
    }

    // Message arrival event binding
    _MqttClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;

    // Connection Disconnection Event Binding
    _MqttClient.ConnectionClosed += (sender, e) =>
    {
        if (!_ToClose)
        {
            // Try to reconnect
            _TryContinueConnect();
        }
    };
}

(3) Try to connect the parts

// Initiate a connection and subscribe to related topics if the connection is successful 
private void _Connect()
{
    if (String.IsNullOrEmpty(_MqttUsername))
    {
        var b = _MqttClient.Connect(_MqttClientId);
    }
    else
    {
        var b = _MqttClient.Connect(_MqttClientId, _MqttUsername, _MqttUserpass);
    } 

    if (_MqttClient.IsConnected)
    {
        _MqttClient.Subscribe(new string[] { "topic1", "topic2" },
            new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
    }
}

The results are good and the delay time can be adjusted appropriately.

Tags: C#

Posted on Fri, 30 Aug 2019 22:10:34 -0700 by filn