本文主要包括以下内容:
通过生产者-消费者模式保证数据链路的鲁棒性
改进音频录制及播放,提高语音通信质量
采用多播实现设备发现及跨路由通信
实现对讲进程与UI进程的通信(AIDL)
在《实时Android语音对讲系统架构》对语音对讲系统的数据链路的分析中提到,数据包要经过Record、Encoder、Transmission、Decoder、Play这一链条的处理,这种数据流转就是对讲机核心抽象,鉴于这种场景,采用了责任链设计模式。
在后续实践中发现这样的结构存在一些问题,责任链模式适用于数据即时流转,需要整个链路没有阻塞、等待。而在本应用场景中,编解码及录制播放均可能存在时间延迟,责任链模式无法兼顾网络、编解码的延时。
事实上,通过缓存队列则可以保证数据链路的稳定性,分别在编解码和数据发送接收时加入阻塞队列,可以实现数据包的缓冲,同时降低丢包的可能。因此,在本系统场景下,基于阻塞队列实现了生产者-消费者模式,是对责任链模式的优化,意在提高数据链路的鲁棒性。
本节包括以下内容:
阻塞队列(数据结构)
阻塞队列实现生产者-消费者模式
阻塞队列(数据结构)
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
在队列为空时,获取元素的线程会等待队列变为非空。
当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 | |
---|---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) | |
移除方法 | remove() | poll() | take() | poll(time,unit) | |
检查方法 | element() | peek() | 不可用 | 不可用 |
抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
本文通过LinkedBlockingQueue的put和take方法实现线程阻塞。LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
首先看下LinkedBlockingQueue中核心的域:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }private final int capacity;private final AtomicInteger count = new AtomicInteger();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue和LinkedList类似,通过静态内部类Node<E>进行元素的存储;
capacity表示阻塞队列所能存储的最大容量,在创建时可以手动指定最大容量,默认的最大容量为Integer.MAX_VALUE;
count表示当前队列中的元素数量,LinkedBlockingQueue的入队列和出队列使用了两个不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。
head和last分别表示链表的头部和尾部;
takeLock表示元素出队列时线程所获取的锁,当执行take、poll等操作时线程获取;notEmpty当队列为空时,通过该Condition让获取元素的线程处于等待状态;
putLock表示元素入队列时线程所获取的锁,当执行put、offer等操作时获取;notFull当队列容量达到capacity时,通过该Condition让加入元素的线程处于等待状态。
其次,LinkedBlockingQueue有三个构造方法,分别如下:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
默认构造函数直接调用LinkedBlockingQueue(int capacity),LinkedBlockingQueue(int capacity)会初始化首尾节点,并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化队列的同时,将一个集合的全部元素加入队列。
最后,重点分析下put和take的过程:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
之所以把put和take放在一起,是因为它们是一对互逆的过程:
put在插入元素前首先获得putLock和当前队列的元素数量,take在去除元素钱首先获得takeLock和当前队列的元素数量;
put时需要判断当前队列是否已满,已满时当前线程进行等待,take时需要判断队列是否已空,队列为空时当前线程进行等待;
put调用enqueue在队尾插入元素,并修改尾指针,take调用dequeue将head指向原来first的位置,并将first的数据域置位null,实现删除原first指针,并产生新的head,同时,切断原head节点的引用,便于垃圾回收。
private void enqueue(Node<E> node) { last = last.next = node; }private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GChead = first; E x = first.item; first.item = null;return x; }
最后,put根据count决定是否触发队列未满和队列空;take根据count决定是否触发队列未空和队列满。
LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,这也意味着它们之间的操作不会存在互斥。在多个CPU的情况下,可以做到在同一时刻既消费、又生产,做到并行处理。
阻塞队列实现生产者-消费者模式
通过对LinkedBlockingQueue主要源码的分析,实现生产者-消费者模式就变得简单了。
public class MessageQueue { private static MessageQueue messageQueue1, messageQueue2, messageQueue3, messageQueue4; private BlockingQueue<AudioData> audioDataQueue = null; private MessageQueue() { audioDataQueue = new LinkedBlockingQueue<>(); } @Retention(SOURCE) @IntDef({ENCODER_DATA_QUEUE, SENDER_DATA_QUEUE, DECODER_DATA_QUEUE, TRACKER_DATA_QUEUE}) public @interface DataQueueType { } public static final int ENCODER_DATA_QUEUE = 0; public static final int SENDER_DATA_QUEUE = 1; public static final int DECODER_DATA_QUEUE = 2; public static final int TRACKER_DATA_QUEUE = 3; public static MessageQueue getInstance(@DataQueueType int type) { switch (type) { case ENCODER_DATA_QUEUE: if (messageQueue1 == null) { messageQueue1 = new MessageQueue(); } return messageQueue1; case SENDER_DATA_QUEUE: if (messageQueue2 == null) { messageQueue2 = new MessageQueue(); } return messageQueue2; case DECODER_DATA_QUEUE: if (messageQueue3 == null) { messageQueue3 = new MessageQueue(); } return messageQueue3; case TRACKER_DATA_QUEUE: if (messageQueue4 == null) { messageQueue4 = new MessageQueue(); } return messageQueue4; default: return new MessageQueue(); } } public void put(AudioData audioData) { try { audioDataQueue.put(audioData); } catch (InterruptedException e) { e.printStackTrace(); } } public AudioData take() { try { return audioDataQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
这里通过@IntDef来实现限定输入类型的功能,同时,阻塞队列保持单实例,然后将队列分别应用到各个生产者-消费者线程中。在本文的语音对讲系统中,以音频录制线程和编码线程为例,录制线程是音频数据包的生产者,编码线程是音频数据包的消费者。
音频录制线程:
@Overridepublic void run() { while (isRecording) { if (audioRecord.getRecordingState() == AudioRecord.RECORDSTATE_STOPPED) { audioRecord.startRecording(); } // 实例化音频数据缓冲 short[] rawData = new short[inAudioBufferSize]; audioRecord.read(rawData, 0, inAudioBufferSize); AudioData audioData = new AudioData(rawData); MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).put(audioData); } }
编码线程:
@Overridepublic void run() { AudioData data; // 在MessageQueue为空时,take方法阻塞 while ((data = MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).take()) != null) { data.setEncodedData(AudioDataUtil.raw2spx(data.getRawData())); MessageQueue.getInstance(MessageQueue.SENDER_DATA_QUEUE).put(data); }}
同样的,编码线程和发送线程,接收线程和解码线程,解码线程和播放线程同样存在生产者-消费者的关系。
录制,改变了音频输入源,将直接从麦克风(MIC)获取改为MediaRecorder.AudioSource.VOICE_COMMUNICATION,VOICE_COMMUNICATION能自动回声消除和增益,因此,屏蔽了speex在C层的降噪和增益。
播放,改变了音频输出端,将STREAM_MUSIC换成STREAM_VOICE_CALL,因为,对讲机应用更类似于语音通信。换成STREAM_VOICE_CALL之后,遇到的问题是只能从听筒听到声音,于是设置免提功能。
AudioManager audioManager =(AudioManager) getSystemService(Context.AUDIO_SERVICE); audioManager.setMode(AudioManager.MODE_IN_COMMUNICATION); audioManager.setSpeakerphoneOn(true);
该设置必须要开放修改音频的权限,不然没有效果。
<uses-permission android:name="android.permission.MODIFY_AUDIO_SETTINGS"/>
目前的语音通信质量,个人感觉仍然需要继续优化,如果您有这方面的经验(包括但不限于Java层和Speex音频处理),不吝赐教!
《通过UDP广播实现Android局域网Peer Discovering》中从编程的角度说明了TCP与UDP的区别,主要分析了TCP是面向连接的、可靠的服务,建立连接需要经过三次握手、销毁连接需要四次挥手;UDP是无连接的传输层协议,提供面向事务的简单不可靠信息传送服务。
IP地址分为三类:单播、广播和多播。广播和多播仅用于UDP,它们用于将报文同时传送给多个接收者。广播分为:受限广播、指向网络的广播、指向子网的广播、指向所有子网的广播。
举个栗子:当前IP为10.13.200.16/22,首先广播地址为255.255.255.255,子网广播地址为10.13.203.255。
《通过UDP广播实现Android局域网Peer Discovering》采用子网广播实现局域网Android设备的发现,但在实践中,一般路由器会禁止所有广播跨路由器传输。所以,如果子网内有多个路由器,那么就无法实现设备发现了。因此,本文将设备发现也改为多播实现。多播组地址包括为1110的最高4bit和多播组号,范围为224.0.0.0到239.255.255.255。能够接收发往一个特定多播组地址数据的主机集合称为主机组,主机组可以跨越多个网络。
IANA 把224.0.0.0 到 224.0.0.255 范围内的地址全部都保留给了路由协议和其他网络维护功能。该范围内的地址属于局部范畴,不论生存时间字段(TTL)值是多少,都不会被路由器转发;D类保留地址的完整的列表可以参见RFC1700。
224.0.1.0 到 238.255.255.255 地址范围作为用户组播地址,在全网范围内有效。其中233/8 为 GLOP 地址。GLOP 是一种自治系统之间的组播地址分配机制,将 AS 号直接填入组播地址的中间两个字节中,每个自治系统都可以得到 255 个组播地址;
239.0.0.0 到 239.255.255.255 地址范围为本地管理组播地址(administratively scoped addresses),仅在特定的本地范围内有效。
本文对比了子网广播和多播,子网广播地址为:192.168.137.255,多播组地址为:224.5.6.7。
发送接收采用同一MulticastSocket,MulticastSocket设置TTL,TTL表示跨网络的级数。
try { inetAddress = InetAddress.getByName(Constants.MULTI_BROADCAST_IP); multicastSocket = new MulticastSocket(Constants.MULTI_BROADCAST_PORT); multicastSocket.setLoopbackMode(true); multicastSocket.joinGroup(inetAddress); multicastSocket.setTimeToLive(4); } catch (IOException e) { e.printStackTrace(); }
joinGroup涉及到另一个协议:网路群组管理协议(Internet Group Management Protocol或简写IGMP),通过抓包可以观察到初始化MulticastSocket时加入组协议的报文。
setTimeToLive用于设置生存时间字段。默认情况下,多播数据报的TTL设置为1,使得多播数据报仅限于在同一个子网内传送,更大的TTL值能够被多播路由器转发。在实际传输过程中,多播组地址仍然需要转换为以太网地址。实际转换规则这里不再赘述。
上述多播地址224.5.6.7转换后为01:00:5e:05:06:07。
代码层面上,探测线程将子网广播改为多播实现。
if (command != null) { byte[] data = command.getBytes(); DatagramPacket datagramPacket = new DatagramPacket( data, data.length, Multicast.getMulticast().getInetAddress(), Constants.MULTI_BROADCAST_PORT); try { Multicast.getMulticast().getMulticastSocket().send(datagramPacket); } catch (IOException e) { e.printStackTrace(); } }
并且在接收端区分指令和音频数据。
while (true) { // 设置接收缓冲段 byte[] receivedData = new byte[512]; DatagramPacket datagramPacket = new DatagramPacket(receivedData, receivedData.length); try { // 接收数据报文 Multicast.getMulticast().getMulticastSocket().receive(datagramPacket); } catch (IOException e) { e.printStackTrace(); } // 判断数据报文类型,并做相应处理 if (datagramPacket.getLength() == Command.DISC_REQUEST.getBytes().length || datagramPacket.getLength() == Command.DISC_LEAVE.getBytes().length || datagramPacket.getLength() == Command.DISC_RESPONSE.getBytes().length) { handleCommandData(datagramPacket); } else { handleAudioData(datagramPacket); } }
在实际工程应用场景中,需要对讲机进程即使切换到后台,也依然能收到信息。因此,为了提高进程的优先级,降低被系统回收的概率,采用了在Service中访问网络服务,处理语音信息的发送和接收的方案。前台Activity负责显示组播组内用户(上线和下线,更新页面),通过AIDL与Service进行跨进程通信和回调。Service的清单说明如下:
<service android:name=".service.IntercomService" android:process=":intercom" />
:intercom表示定义子进程intercom。
使用多进程相比于常见的单进程,有一些需要注意的点:
静态成员和单例模式失效。因为每个进程都会分配一个独立的虚拟机,不同的虚拟机对应不同的地址空间;
线程同步机制失效。因此不同进程锁的并不是同一个对象;
Application会多次创建。进程与Application对应,多进程会启动多个Application。
因此,通过process定义了多进程之后,一定要避免单进程模式下对象共享的思路。另外,在AS中调试多进程应用的时候,断点一定要针对不同的进程,以本文为例,添加断点需要选择主进程和intercom进程。给两个进程分别添加调试断点后,可以看到有两个Debugger:3156和3230(由于存在Jni代码,所以显示了Hybrid Debugger)。
由于既存在Activity到Service的通信,也存在Service接收到消息之后更新Activity页面的需求,所以这里采用了跨进程回调的方式。首先,AIDL方法如下:
package com.jd.wly.intercom.service;import com.jd.wly.intercom.service.IIntercomCallback;interface IIntercomService { void startRecord(); void stopRecord(); void registerCallback(IIntercomCallback callback); void unRegisterCallback(IIntercomCallback callback); }
package com.jd.wly.intercom.service;interface IIntercomCallback { void findNewUser(String ipAddress); void removeUser(String ipAddress); }
IIntercomService定义了Activity到Service的通信方法,包含启动和停止音频录制,以及注册和解除回调接口;IIntercomCallback定义了从Service到Activity的回调接口,用于在Service发现用户上线、下线时通知前台Activity的显示。
AIDL文件的定义涉及一些规范:比如变量在同一包内也需要import,非基本数据类型参数列表需要指明in、out,自定义参数类型需要同时编写java文件和aidl文件等,本文篇幅有限,就不具体展开AIDL跨进程通信的细节了。
Activity检测用户的按键操作,然后将事件传递给Service进行对应的逻辑处理。
将Service绑定到Activity首先需要定义ServiceConnection:
/** * onServiceConnected和onServiceDisconnected运行在UI线程中 */private IIntercomService intercomService;private ServiceConnection serviceConnection = new ServiceConnection() { @Override public void onServiceConnected(ComponentName name, IBinder service) { intercomService = IIntercomService.Stub.asInterface(service); try { intercomService.registerCallback(intercomCallback); } catch (RemoteException e) { e.printStackTrace(); } } @Override public void onServiceDisconnected(ComponentName name) { intercomService = null; } };
在onStart()时绑定Service,onStop()时解除回调和绑定。
@Overrideprotected void onStart() { super.onStart(); Intent intent = new Intent(AudioActivity.this, IntercomService.class); bindService(intent, serviceConnection, BIND_AUTO_CREATE); }
@Overrideprotected void onStop() { super.onStop(); if (intercomService != null && intercomService.asBinder().isBinderAlive()) { try { intercomService.unRegisterCallback(intercomCallback); } catch (RemoteException e) { e.printStackTrace(); } unbindService(serviceConnection); } }
Activity获取了Service的服务后,分别在按键事件处理中进行调用。
@Overridepublic boolean onKeyDown(int keyCode, KeyEvent event) { if ((keyCode == KeyEvent.KEYCODE_F2 || keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) { try { intercomService.startRecord(); } catch (RemoteException e) { e.printStackTrace(); } return true; } return super.onKeyDown(keyCode, event); }@Overridepublic boolean onKeyUp(int keyCode, KeyEvent event) { if ((keyCode == KeyEvent.KEYCODE_F2 || keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) { try { intercomService.stopRecord(); } catch (RemoteException e) { e.printStackTrace(); } return true; } return super.onKeyUp(keyCode, event); }
startRecord和stopRecord的具体实现定义在Service中:
public IIntercomService.Stub mBinder = new IIntercomService.Stub() { @Override public void startRecord() throws RemoteException { if (!recorder.isRecording()) { recorder.setRecording(true); tracker.setPlaying(false); threadPool.execute(recorder); } } @Override public void stopRecord() throws RemoteException { if (recorder.isRecording()) { recorder.setRecording(false); tracker.setPlaying(true); } } @Override public void registerCallback(IIntercomCallback callback) throws RemoteException { mCallbackList.register(callback); } @Override public void unRegisterCallback(IIntercomCallback callback) throws RemoteException { mCallbackList.unregister(callback); } };
Service通过RemoteCallbackList保持回调方法,使用时首先定义RemoteCallbackList对象,泛型类型为IIntercomCallback。
private RemoteCallbackList<IIntercomCallback> mCallbackList = new RemoteCallbackList<>();
RemoteCallbackList并不是List,内部通过Map来保存,Key和Value分别为IBinder和Callback。
ArrayMap<IBinder, Callback> mCallbacks = new ArrayMap<IBinder, Callback>();
使用RemoteCallbackList回调Activity方法时,通过beginBroadcast获取数量,
/** * 发现新的组播成员 * * @param ipAddress IP地址 */private void findNewUser(String ipAddress) { final int size = mCallbackList.beginBroadcast(); for (int i = 0; i < size; i++) { IIntercomCallback callback = mCallbackList.getBroadcastItem(i); if (callback != null) { try { callback.findNewUser(ipAddress); } catch (RemoteException e) { e.printStackTrace(); } } } mCallbackList.finishBroadcast(); }
removeUser(String ipAddress)方法与findNewUser(String ipAddress)方法类似。它们具体的实现在Activity中:
/** * 被调用的方法运行在Binder线程池中,不能更新UI */private IIntercomCallback intercomCallback = new IIntercomCallback.Stub() { @Override public void findNewUser(String ipAddress) throws RemoteException { sendMsg2MainThread(ipAddress, FOUND_NEW_USER); } @Override public void removeUser(String ipAddress) throws RemoteException { sendMsg2MainThread(ipAddress, REMOVE_USER); } };
需要注意的是,IIntercomCallback中的回调方法实现并不在UI线程中执行,如果需要更新UI,需要实现多线程调用,多线程依然通过Handler来实现
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。