需求:
一个三方的广播、对讲系统,需要集成到已有的平台,已有平台采用的是B/S架构,也就是用户通过Web端来实现管理需求;
三方的广播、对讲系统则是一个C/S架构的程序,提供三方可以定制开发的Windows端的SDK,这就有了这个积木式程序的开发!
框架图如下:最终是实现一个windows端的本地服务,安装在用户的PC电脑上,web端调度员使用web就能实现相同的C/S端程序的能力!
web端集成mqtt的客户端,接入MQTT服务器,然后通过推送给windows本地服务下发控制指令,比方开始广播、停止广播等指令!
windows本地服务接收到推送指令后,调用相应的功能模块提供的API,完成本地的功能调用,并将结果通过推送通知到web端!
mqtt推送指令中使用json作为消息体,传递控制和回应消息,请求格式和相应格式如下图:
麻雀虽小五脏俱全,集成了glog,方便后续问题定位和分析!
线程安全的队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | //AsyncExecQueue.h #pragma once #include <stdio.h> #include <string> #include <iostream> #include <atomic> #include <mutex> #include <queue> #include "logutils.h" using namespace std; class MessageItem { public : MessageItem() { m_topic = NULL; m_content = NULL; } MessageItem(string &topic, string &content) { m_topic = ( char *) malloc (topic.length() + 1); if (m_topic == NULL) { LOGD( "m_topic == NULL" ); return ; } memset (m_topic, 0x00, topic.length() + 1); memcpy (m_topic, topic.c_str(), topic.length()); m_topic[topic.size()] = '\0' ; m_content = ( char *) malloc (content.size() + 1); if (m_content == NULL) { LOGD( "m_topic == NULL" ); return ; } memset (m_content, 0x00, content.size() + 1); memcpy (m_content, content.c_str(), content.size()); m_content[content.size()] = '\0' ; LOGD( "addr:%0x value:%s, size:%d, content:%0x, value:%s size:%d" , m_topic, m_topic, topic.length(), m_content, m_content, content.size()); } ~MessageItem() { if (m_topic) { free (m_topic); m_topic = NULL; } if (m_content) { free (m_content); m_content = NULL; } } char *m_topic; char *m_content; }; //自旋锁类 class SpinMutex { private : atomic_flag flag = ATOMIC_FLAG_INIT; public : void lock() { while (flag.test_and_set(memory_order_acquire)); } void unlock() { flag.clear(std::memory_order_release); } }; class AsyncExecQueue { private : size_t maxsz; mutable SpinMutex mutx; queue<shared_ptr<MessageItem>> que; AsyncExecQueue() { this ->maxsz = 0; } public : //实现单例模式 static AsyncExecQueue* Instance() { static AsyncExecQueue obj; return &obj; } //任务对象出队 bool pop(shared_ptr<MessageItem>& item); //任务对象入队 bool push(shared_ptr<MessageItem> item); }; //AsyncExecQueue.cpp #include "AsyncExecQueue.h" //任务对象出队 bool AsyncExecQueue::pop(shared_ptr<MessageItem>& item) { lock_guard<SpinMutex> lk(mutx); if (que.empty()) return false ; item = que.front(); que.pop(); return true ; } //任务对象入队 bool AsyncExecQueue::push(shared_ptr<MessageItem> item) { std::lock_guard<SpinMutex> lk(mutx); if (maxsz > 0 && que.size() >= maxsz) return false ; que.push(item); return true ; } |
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com