MqttAndroidClient使用-程序员宅基地

技术标签: 网络  Android  

MQTT协议特点

MQTT是一个由IBM主导开发的物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。它的核心设计思想是开源、可靠、轻巧、简单,具有以下主要的几项特性:

  1. 非常小的通信开销(最小的消息大小为 2 字节);
  2. 支持各种流行编程语言(包括C,Java,Ruby,Python 等等)且易于使用的客户端;
  3. 支持发布 / 预定模型,简化应用程序的开发;
  4. 提供三种不同消息传递等级,让消息能按需到达目的地,适应在不稳定工作的网络传输需求

应项目要求使用MQTT协议实现客户端与服务端通信。Android端使用MqttAndroidClient实现MQTT通信:MqttAndroidClient

MqttAndroidClient配置

1.添加依赖

在项目根目录下的build.gradle中添加:

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
    }
}

在app目录下的build.gradle中添加:

dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

2.权限声明

在AndroidManifest.xml中添加:

    <uses-permission android:name="android.permission.WAKE_LOCK" />
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.READ_PHONE_STATE" />
    <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_WIFI_STATE" />

3.配置服务

在AndroidManifest.xml中添加:

        <service android:name="org.eclipse.paho.android.service.MqttService" />

功能实现

一、简单实现

    private String clientId = "ExampleAndroidClient";
    private MqttAndroidClient mqttAndroidClient;
    private String host = "tcp://xx.xx.xx.xx";//MQTT服务器地址
    private MqttConnectOptions conOpt;
    private String name;//MQTT账号
    private String password;//MQTT密码
    private String topic;//订阅的标题


    //MQTT连接
    private void setMqtt() {
        clientId = clientId + System.currentTimeMillis();
        mqttAndroidClient = new MqttAndroidClient(this,host,clientId);
        conOpt = new MqttConnectOptions();
        // 清除缓存
        conOpt.setCleanSession(true);
        // 自动重连
        conOpt.setAutomaticReconnect(true);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        // 用户名
        conOpt.setUserName(name);
        // 密码
        conOpt.setPassword(password.toCharArray());
        mqttAndroidClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                Log.d(TAG, "connectionLost: 连接断开");
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.d(TAG, "消息到达" + message.getPayload());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });
        try {
            //进行连接
            mqttAndroidClient.connect(conOpt, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d(TAG, "onSuccess: 连接成功");
                    try {
                        //连接成功后订阅主题
                        mqttAndroidClient.subscribe(topic, 2);

                    } catch (MqttException e) {
                        e.printStackTrace();
                    }

                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.d(TAG, "onFailure: 连接失败");
                }
            });

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    public void publishMessage(String payload) {
        try {
            if (mqttAndroidClient.isConnected() == false) {
                mqttAndroidClient.connect();
            }

            MqttMessage message = new MqttMessage();
            message.setPayload(payload.getBytes());
            message.setQos(0);
            mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "publish succeed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "publish failed!");
                }
            });
        } catch (MqttException e) {
            Log.e(TAG, e.toString());
            e.printStackTrace();
        }
    }

二、通过service实现

1.新建MqttService

public class MqttService extends Service {

    public static final String TAG = MqttService.class.getSimpleName();

    private static MqttAndroidClient client;
    private MqttConnectOptions conOpt;
    private IGetMessageCallBack mIGetMessageCallBack;

    private String host = "tcp://xx.xx.xx.xx";
    private String userName = "admin";
    private String passWord = "password";
    private String topic;
    private String clientId = "AndroidClient_";

    @Override
    public void onCreate() {
        super.onCreate();
        init();
    }

    private void init() {
        clientId = clientId + System.currentTimeMillis();
        client = new MqttAndroidClient(this, host, clientId);
        conOpt = new MqttConnectOptions();
        client.setCallback(mqttCallback);
        // 清除缓存
        conOpt.setCleanSession(true);
        // 自动重连
        conOpt.setAutomaticReconnect(true);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        conOpt.setUserName(userName);
        conOpt.setPassword(passWord .toCharArray());
        doClientConnection();
    }

    @Override
    public IBinder onBind(Intent intent) {
        return new CustomBinder();
    }

    public class CustomBinder extends Binder {
        public MqttService getService(){
            return MqttService.this;
        }
    }

    public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
        this.mIGetMessageCallBack = iGetMessageCallBack;
    }

    /** 连接MQTT服务器 */
    private void doClientConnection() {
        if (!client.isConnected() && isConnectIsNormal()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }

    public static void publishMessage(String payload) {
        try {
            if (client.isConnected() == false) {
                client.connect();
            }

            MqttMessage message = new MqttMessage();
            message.setPayload(payload.getBytes());
            message.setQos(0);
            client.publish(topic, message,null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "publish succeed!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "publish failed!");
                }
            });
        } catch (MqttException e) {
            Log.e(TAG, e.toString());
            e.printStackTrace();
        }
    }

    // MQTT是否连接成功
    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            Log.d(TAG, "onSuccess: 连接成功");
            try {
                //连接成功后订阅主题
                client.subscribe(topic, 2);

            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            exception.printStackTrace();
        }
    };

    // MQTT监听并且接受消息
    private MqttCallback mqttCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            Log.d(TAG, "connectionLost: 连接断开");
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String str = new String(message.getPayload());
            if (mIGetMessageCallBack != null){
                mIGetMessageCallBack.setMessage(str);
            }
            Log.d(TAG, "消息到达");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {

        }
    };

    /** 判断网络是否连接 */
    private boolean isConnectIsNormal() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.i(TAG, "MQTT当前网络名称:" + name);
            return true;
        } else {
            Log.i(TAG, "MQTT 没有可用网络");
            return false;
        }
    }

    @Override
    public void onDestroy() {
        stopSelf();
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
        super.onDestroy();
    }

2.新建IGetMessageCallBack,接收服务端发送的消息

public interface IGetMessageCallBack {
    public void setMessage(String message);
}

3.新建MyServiceConnection

public class MyServiceConnection implements ServiceConnection {

    private MqttService mqttService;
    private IGetMessageCallBack iGetMessageCallBack;

    @Override
    public void onServiceConnected(ComponentName name, IBinder service) {
        mqttService =  ((MqttService.CustomBinder)service).getService();
        mqttService.setIGetMessageCallBack(iGetMessageCallBack);
    }

    @Override
    public void onServiceDisconnected(ComponentName name) {

    }

    public MqttService getMqttService() {
        return mqttService;
    }

    public void setIGetMessageCallBack(IGetMessageCallBack iGetMessageCallBack) {
        this.iGetMessageCallBack = iGetMessageCallBack;
    }

4.使用

public class MainActivity extends AppCompatActivity implements IGetMessageCallBack {


    private MyServiceConnection myServiceConnection;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        myServiceConnection = new MyServiceConnection();
        myServiceConnection.setIGetMessageCallBack(this);
        startMqttService();
        //向服务器发送消息
        //MqttService.publishMessage("Hello!");
    }

    
    private void startMqttService() {
        Intent intent = new Intent(this, MqttService.class);
        bindService(intent, myServiceConnection, Context.BIND_AUTO_CREATE);
    }

    @Override
    public void setMessage(String message) {
        Log.d(TAG, "获取的消息" + message);
    }
}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/cbxboy163/article/details/127634174

智能推荐

javax.imageio.IIOException: Not a JPEG file: starts with 0x47 0x49-程序员宅基地

文章浏览阅读7.8k次。java处理图片时出现异常javax.imageio.IIOException: Not a JPEG file: starts with 0x47 0x49at com.sun.imageio.plugins.jpeg.JPEGImageReader.readImageHeader(Native Method)at com.sun.imageio.plugins.jpeg.JPEGI_javax.imageio.iioexception: not a jpeg file: starts with 0x52 0x49

代发外链哪家好?-程序员宅基地

文章浏览阅读343次,点赞11次,收藏6次。与其授人以鱼不如授人以渔,在这里说说如何选择好的外链商,可以先找一下你要发的这家外链商的口碑,了解其专业水平,这倒是最基本的了,说到底这些东西说得难听点都是可以伪造的,所以最重要的,是要了解外链的作用。外链可以说是网站外部优化最重要的组成部分,一个网站的外链建设对于网站网站优化是至关重要的,选择到一家好的外链商可以说成功了一半,毕竟不是谁都有外链资源。外链最重要的作用毫无疑问,就是提升网站的关键词排名,不能提升排名的外链可以说没有意义,有人就会说了,但外链的作用不是日积月累的吗?

使用Apache的ab工具进行压力测试_用apache中的ab测试接口压力中的时延是什么-程序员宅基地

文章浏览阅读442次。ab命令原理 Apache的ab命令模拟多线程并发请求,测试服务器负载压力,也可以测试nginx、lighthttp、IIS等其它Web服务器的压力。 Apache附带的ab工具(使用的PHP环境是WAMP集成环境,ab工具位于D:\wamp\bin\apache\Apache2.2.21\bin)非常容易使用。ab命令对发出负载的计算机要求很低,既不会占用很多CPU,也不会占用太多的内存,但_用apache中的ab测试接口压力中的时延是什么

falsk框架中安装flask-mysqldb报错解决方案_flask_mysqldb安装失败windows-程序员宅基地

文章浏览阅读1k次。我的是py37版本,无法直接安装flask-mysqldb。下载完成之后直接在控制台本地安装。下载mysqlclient。_flask_mysqldb安装失败windows

手把手教你启用Win10的Linux子系统(超详细)_win10自带linux子系统怎么用-程序员宅基地

文章浏览阅读10w+次,点赞143次,收藏775次。今天为大家介绍如何才能启用Windows10下的Linux子系统,废话不多说,直接看步骤:启用开发者模式打开设置 点击更新和安全 点击开发者选项 启用开发人员模式 更改系统功能使用win+X快捷键调出系统管理菜单后点击应用和功能,然后拉到底下,选择程序和功能 选中应用或关闭Windows功能 勾选适用于Linux的Windows子系统,然后确认并重启..._win10自带linux子系统怎么用

SCI必备Latex编写工具(texlive+texstudio的安装及使用---超详细)-程序员宅基地

文章浏览阅读1.9w次,点赞46次,收藏113次。前言满纸荒唐言,一把辛酸泪。都云作者痴,谁解其中味?只有我的电脑知道为了安装Latex排版的工具花了多少功夫,查了多少资料。斗争之旅我的电脑上很早就有老师给的CTEX安装包,并且安装的时候还是百度了一下安装步骤,生怕到时候会有问题。结果等到我要开始写SCI论文的时候才发现,咦? 这是啥错误undefined control sequence\begin{document},改了路径啥的好多操作都不行,于是卸载,结果发现卸载都卸载不掉,文件都删除不了,后面我慢慢删也是删完了,后面还发现居然还修改_texlive

随便推点

超文本标记语言_head表示超文本文件头信息的结束-程序员宅基地

文章浏览阅读6.2k次。超文本标记语言百科名片超文本标记语言,即HTML(Hypertext Markup Language),是用于描述网页文档的一种标记语言。 查看精彩图册目录基本介绍由来定义语言特点编辑发展历史超文本标记语言可扩展超文本标记语言整体结构文件头部内容文件主体内容字符集_head表示超文本文件头信息的结束

h265硬解码和软解码_h265能通过gpu解码-程序员宅基地

文章浏览阅读2k次。h.265解码库,支持GPU和CPU1.初始化PlayerSDK_Init(CallBack callBackFunc,int nType);callBackFunc 回调函数nType 视频解码方式 CPU解码或者GPU解码2.播放接口PlayerSDK_Play(char* URL, long hWnd, int nType);URL 播放地址hWnd 播放句柄nType 播放类型接口返回播放句柄号3.停止播放接口Play_h265能通过gpu解码

stable diffusion(1): webui的本地部署(windows)_sd webui torch版本-程序员宅基地

文章浏览阅读2.1k次。有一个坑一直没过去,就是如果整体环境没完全装好,但是使用我自己提前创建的python虚拟环境来启动SD启动脚本stable-diffusion-webui/webui-user.bat,期间会因为某些原因(比如没梯子东西下载不下来)启动失败,但是第二次启动时就会报没有pip模块的错误,我就只能重新创建python虚拟环境,再装一遍包,这个过程很漫长很浪费时间,所以一定跟着我的脚步,一步不要落下的走,心急吃不了热豆腐。如果没有梯子,这里很慢或者根本过不去,所以参考。三、修改url地址(梯子强可不改)_sd webui torch版本

CTFSHOW做题记录_ctfshow 龙猫-程序员宅基地

文章浏览阅读491次。CTFSHOW做题记录**CTFSHOW做题记录1**(菜菜的我要写日记啦,欢迎大佬指导)**密码学签到1给出“}wohs.ftc{galf”并且提示倒叙。**解题思路:没看提示的时候乍一看以为是栅栏密码,还想着用在线解密去做,但是定睛一看不对劲,再看题目原来就是倒叙。只需要反着来就好啦。**答案:flag{ctf.show}**今天也是元气满满的一天,好好学习。..._ctfshow 龙猫

抓取动态网页的数据的具体操作方法_动态加载的网页怎么获取链接-程序员宅基地

文章浏览阅读1.9k次。不同的方法适用于不同的情况,例如如果目标网站使用的是JavaScript动态加载数据,那么使用Scrapy-Splash可能会更加适合。如果目标网站的数据比较简单,那么使用浏览器开发者工具可能会更加方便。如果需要模拟用户的操作,那么使用Selenium可能是更好的选择。总之,需要根据具体情况选择合适的方法,才能高效地获取动态网页的数据。综上所述,选择合适的方法取决于具体的需求。如果需要模拟用户的操作,可以使用Selenium。动态网页是指在用户交互过程中,网页内容不断更新和变化的网页。_动态加载的网页怎么获取链接

Ubuntu20.04安装向日葵_ubuntu20.04 安装向日库-程序员宅基地

文章浏览阅读1k次,点赞3次,收藏6次。下载最新版本:https://sunlogin.oray.com/download/缺少部分依赖,手动下载:# 你知道最新的版本号了sudo wget http://download.oray.com/sunlogin/linux/SunloginClient-10.0.2.24779_amd64.debsudo wget http://mirrors.aliyun.com/ubuntu/pool/main/i/icu/libicu60_60.2-3ubuntu3_amd64.debsudo w_ubuntu20.04 安装向日库