浅谈Java对接阿里IOT_晨晨又破音啦的博客-程序员宅基地_java对接物联网

技术标签: java  物联网  

背景

博主之前工作运用的Java是为人工智能服务的,工作了一年,由于换了个城市,就选了个物联网的方向,多尝试,多学习。
物联网主要做两件事:数据采集、指令下发。

数据采集

这里就谈一下Java服务端接入SDK采集数据。用的是阿里的物联网平台,废话不多说直接贴代码,不清楚的可以看阿里官方文档

public class AmqpJavaClientDemo {
    

    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

    //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
    private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue(50000));

    public static void main(String[] args) throws Exception {
    
        //参数说明,请参见AMQP客户端接入说明文档。
        String accessKey = "物联网平台accesskey";
        String accessSecret = "物联网平台accesssecret";
        String consumerGroupId = "物联网平台消费组";
        //iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。
        String iotInstanceId = "";//企业实例要填
        long timeStamp = System.currentTimeMillis();
        //签名方法:支持hmacmd5、hmacsha1和hmacsha256。
        String signMethod = "签名方式";
        //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
        //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
        String clientId = "客户端";

        //userName组装方法,请参见AMQP客户端接入说明文档。
        String userName = clientId + "|authMode=aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",iotInstanceId=" + iotInstanceId
                + ",consumerGroupId=" + consumerGroupId
                + "|";
        //计算签名,password组装方法,请参见AMQP客户端接入说明文档。
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        //${YourHost}为接入域名,请参见AMQP客户端接入说明文档。
        String connectionUrl = "AMQP接入域名";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // 创建连接。
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // 创建会话。
        // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
        // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // 创建Receiver连接。
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
    
        @Override
        public void onMessage(final Message message) {
    
            try {
    
                //1.收到消息之后一定要ACK。
                // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
                // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
                // message.acknowledge();
                //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
                // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
                executorService.submit(new Runnable() {
    
                    @Override
                    public void run() {
    
                        processMessage(message);
                    }
                });
            } catch (Exception e) {
    
                logger.error("submit task occurs exception ", e);
            }
        }
    };

    /**
     * 在这里处理您收到消息后的具体业务逻辑。
     */
    private static void processMessage(Message message) {
    
        try {
    
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            logger.info("receive message"
                    + ",\n topic = " + topic
                    + ",\n messageId = " + messageId
                    + ",\n content = " + content);
        } catch (Exception e) {
    
            logger.error("processMessage occurs error ", e);
        }
    }

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
    
        /**
         * 连接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
    
            logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * 尝试过最大重试次数之后,最终连接失败。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
    
            logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * 连接中断。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
    
            logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * 连接中断后又自动重连上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
    
            logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {
    }

        @Override
        public void onSessionClosed(Session session, Throwable cause) {
    }

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
    }

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {
    }
    };

    /**
     * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
    
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }
}

至此,java服务端已经可以成功接入IOT收到设备端的消息了,接入后的逻辑主要是在processMessage这个方法里实现监听、捕获设备端的事件消息等。

指令下发

指令下发服务端主要用到的是下发服务和事件,最常用的是下发服务,阿里官方文档API,下发服务代码示例:

 IClientProfile profile = DefaultProfile.getProfile("regionId区域id", "accessKeyId", "secret");
 DefaultAcsClient client = new DefaultAcsClient(profile);
InvokeThingServiceRequest request = new InvokeThingServiceRequest();
String param = "物联网平台功能定义的参数";
request.setArgs(param);

上报事件(可用Pub的方式,前提是物联网平台的功能定义要是一个事件,而非属性或服务),上报事件的代码:

 IClientProfile profile = DefaultProfile.getProfile("regionId", "accessKey", "accessSecret");
        DefaultAcsClient client = new DefaultAcsClient(profile);
        PubRequest request = new PubRequest();
        request.setProductKey("物联网平台的productkey");
        String param = "物联网平台功能定义的参数";
        request.setMessageContent(Base64.encodeBase64String(param.getBytes()));
        request.setTopicFullName("topic路径");
       

上报事件或者下发服务成功后会有一个response相应信息,应用场景不同response也不同,可以在官方文档里看一下

文章如果有什么不足之处,欢迎大家指出交流

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_45546333/article/details/118104970

智能推荐

eclipse配置servlet路径出错问题_returnadsss的博客-程序员宅基地_servlet配置出错

如图,配置完全正常,但是每次运行时都会在前面多一个servlet,一直在找原因,也比对了网上的教程,路径完全正确,照着书上的重新来了一遍,也是正常的,最后新建一个项目,再运行一遍就正常了,估计是之前的xml写的不规范,导致系统中有些记录遗留,所以路径不正确,新建项目,把代码复制过去即可。...

【Linux运维】第一章:服务器硬件及linux初体验_就叫一片白纸的博客-程序员宅基地

第一章:服务器硬件及linux初体验一、了解linux;1.linux的三大分支:debian、redhat、ubuntu2.redhat版本:redhat:红帽,简称RHEL,企业级官方版本,收费版本;centos:企业社区操作系统版本,不属于红帽公司,却基于红帽的源,代码组件进行重构的社区版;3.安装linux系统注意点:a.将ISO镜像文件放到光驱,点击连接;b.使用默认的磁...

c语言编译为机器码,单片机C语言到机器码的全过程_weixin_39811101的博客-程序员宅基地

我们平时做单片机编程,最常使用的是C语言和汇编语言两种,但是最终下载到单片机内部的,却是HEX文件里的二进制机器代码,很多同学不懂这个过程是如何转换的。我们的程序开发环境,比如KEIL和伟福等等,都是可以使用C或者汇编语言来编程的。对于C语言来讲,一旦我们编程完毕,就可以点击编译,编译的过程,实际上首先通过“编译器”将C语言翻译成汇编语言,再通过“汇编器”将汇编语言转化成机器代码,最终可以下载到单...

战歌网php,教你怎么采集傲气战歌网歌曲(包括分类)_weixin_39706127的博客-程序员宅基地

$list = "\n\n";//首先获取本采集程序地址$fname = 'http://' . $_SERVER['SERVER_NAME'] . $_SERVER["SCRIPT_NAME"];//建立傲气战歌13个分类数组,分类基本是死的就不实现采集列出了(接口也难找)$name = array ("每月新曲","MC佳瑶","指挥战歌","DJ音乐","推荐嗨曲","国外战歌","激情战歌...

mysql输入中文出现ERROR 1366_H_MZ的博客-程序员宅基地

mysql输入中文出现如下错误:ERROR 1366: 1366: Incorrect string value: '\xE6\xB0\xB4\xE7\x94\xB5...' for column 'introduce' at row 1这是因为mysql编码的问题 打开mysqlmysql -u root -p...

centos 6 python django mysql_CentOS + Python3.6+ Django2.0 + uwsgi + nginx + mysql web发布环境搭建..._动机在杭州的博客-程序员宅基地

目录:CentOS上升级Python安装easy_install和pipuwsgi安装及测试Django安装及测试连接uwsgi与Djangonginx安装及测试连接uwsgi与nginx连接uwsgi与Django与nginxuwsgi inimysql安装设置python3 Django mysql连接及测试快速搭建blog测试Pycharm开发如果只是想学习django开发直接用django...

随便推点

python 结巴分词(jieba)学习_zhangqixiang5449的博客-程序员宅基地

转:http://www.tuicool.com/articles/QV36ru 源码下载的地址:https://github.com/fxsjy/jieba演示地址:http://jiebademo.ap01.aws.af.cm/特点 1,支持三种分词模式: a,精确模式,试图将句子最精确地切开,适合文本分析; b,全模式,把句子中所有的可以成词的词语都扫描出来, 速度非

python pip有什么用_pip的介绍和使用_weixin_39730911的博客-程序员宅基地

我们都知道python有很多的第三方库或者说是模块。这些库针对不同的应用,发挥不同的作用。我们在实际的项 目中肯定会用到这些模块。那如何将这些模块导入到自己的项目中呢?Python官方的PyPi仓库为我们提供了一个统一的代码托管仓库,所有的第三方库,甚至你自己写的开源模块,都 可以发布到这里,让全世界的人分享下载 。python有两个著名的包管理工具easy_install和pip。在python...

来信, 创业 和 移动互联网_无明客的博客-程序员宅基地

上一篇博文翻译了Steve Yegge的rant,这两天有一些事让我也想rant一下(所谓rant就是一篇巨长无比的抱怨和说教),不过无论是从见解还是恶搞来说肯定没有SteveY的水平高,所以,这篇博文只是单纯的rant,看标题就知道了,就像“篱笆,女人和狗”一样,乡土味实足。所以,下述的一些观点未必正确,也未必靠谱,也就是我的个人唠叨罢了,我想到哪里说到哪里。(篇幅较长,见谅)引子

python打包_Python打包的艺术(二)- Packaging Tools的昨天,今天,明天_weixin_39937412的博客-程序员宅基地

Packaging Tools的昨天,今天,明天截至今年3月,Python的打包工具链处于的状态可以用“非常混乱”来形容。除了官方的distutils,第三方有setuptools(easy_install),distribute,pip,zc.buildout。这些都是相当有名的,还有不是很有名的,如bento,c而setuptools,distribute和pip正如综述里介绍的一样,联系又是...

[源码解析] 深度学习分布式训练框架 horovod (15) — 广播 & 通知_罗西的思考的博客-程序员宅基地

Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。本系列将通过源码分析来带领大家了解 Horovod。本文是系列第十五篇,看看horovod 弹性训练如何广播和发送通知。

osmdroid API解读(二)_Tiny小祥的博客-程序员宅基地

osmdroid API解读(二)osmdroid-android模块 org.osmdroid.config包1、IConfigurationProvider/*** 用于 get/set a configuration provider for osmdroid*/public interface IConfigurationProvider { //get&set 使用gps位置后

推荐文章

热门文章

相关标签