温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么用MINA、Netty、Twisted来实现消息分割

发布时间:2021-12-28 15:43:06 阅读:126 作者:小新 栏目:互联网科技
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章主要介绍了怎么用MINA、Netty、Twisted来实现消息分割,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

本文介绍一种消息分割方式,use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息。

怎么用MINA、Netty、Twisted来实现消息分割  固定字节数的Header前缀来指定Body的字节数

上面图中 Header 固定为 4 字节,Header 中保存的是一个 4 字节(32位)的整数,例如 12 即为 0x0000000C,这个整数用来指定 Body 的长度(字节数)。当读完这么多字节的 Body 之后,又是下一条消息的 Header。

下面分别用MINA、Netty、Twisted来实现对这种消息的切合和解码。

MINA

MINA 提供了 PrefixedStringCodecFactory 来对这种类型的消息进行编码解码,PrefixedStringCodecFactory 默认 Header 的大小是4字节,当然也可以指定成1或2。

public class TcpServer {  public static void main(String[] args) throws IOException {    IoAcceptor acceptor = new NioSocketAcceptor();    // 4字节的Header指定Body的字节数,对这种消息的处理    acceptor.getFilterChain().addLast("codec",        new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));    acceptor.setHandler(new TcpServerHandle());    acceptor.bind(new InetSocketAddress(8080));  }}class TcpServerHandle extends IoHandlerAdapter {  @Override  public void exceptionCaught(IoSession session, Throwable cause)      throws Exception {    cause.printStackTrace();  }  // 接收到新的数据  @Override  public void messageReceived(IoSession session, Object message)      throws Exception {    String msg = (String) message;    System.out.println("messageReceived:" + msg);  }  @Override  public void sessionCreated(IoSession session) throws Exception {    System.out.println("sessionCreated");  }  @Override  public void sessionClosed(IoSession session) throws Exception {    System.out.println("sessionClosed");  }}

Netty

Netty 使用 LengthFieldBasedFrameDecoder 来处理这种消息。下面代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5个参数,分别是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度,lengthFieldOffset为Header的位置,lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度,并不包含Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的全部内容,这里设为4表示去掉4字节的Header,只留下Body)。

public class TcpServer {  public static void main(String[] args) throws InterruptedException {    EventLoopGroup bossGroup = new NioEventLoopGroup();    EventLoopGroup workerGroup = new NioEventLoopGroup();    try {      ServerBootstrap b = new ServerBootstrap();      b.group(bossGroup, workerGroup)          .channel(NioServerSocketChannel.class)          .childHandler(new ChannelInitializer<SocketChannel>() {            @Override            public void initChannel(SocketChannel ch)                throws Exception {              ChannelPipeline pipeline = ch.pipeline();              // LengthFieldBasedFrameDecoder按行分割消息,取出body              pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));              // 再按UTF-8编码转成字符串              pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));              pipeline.addLast(new TcpServerHandler());            }          });      ChannelFuture f = b.bind(8080).sync();      f.channel().closeFuture().sync();    } finally {      workerGroup.shutdownGracefully();      bossGroup.shutdownGracefully();    }  }}class TcpServerHandler extends ChannelInboundHandlerAdapter {  // 接收到新的数据  @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) {    String message = (String) msg;    System.out.println("channelRead:" + message);  }  @Override  public void channelActive(ChannelHandlerContext ctx) {    System.out.println("channelActive");  }  @Override  public void channelInactive(ChannelHandlerContext ctx) {    System.out.println("channelInactive");  }  @Override  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {    cause.printStackTrace();    ctx.close();  }}

Twisted

在Twisted中需要继承Int32StringReceiver,不再继承Protocol。Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而需要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。

# -*- coding:utf-8 –*-from twisted.protocols.basic import Int32StringReceiverfrom twisted.internet.protocol import Factoryfrom twisted.internet import reactorclass TcpServerHandle(Int32StringReceiver):    # 新的连接建立    def connectionMade(self):        print 'connectionMade'    # 连接断开    def connectionLost(self, reason):        print 'connectionLost'    # 接收到新的数据    def stringReceived(self, data):        print 'stringReceived:' + datafactory = Factory()factory.protocol = TcpServerHandlereactor.listenTCP(8080, factory)reactor.run()

下面是Java编写的一个客户端测试程序:

public class TcpClient {  public static void main(String[] args) throws IOException {    Socket socket = null;    DataOutputStream out = null;    try {      socket = new Socket("localhost", 8080);      out = new DataOutputStream(socket.getOutputStream());      // 请求服务器      String data1 = "牛顿";      byte[] outputBytes1 = data1.getBytes("UTF-8");      out.writeInt(outputBytes1.length); // write header      out.write(outputBytes1); // write body      String data2 = "爱因斯坦";      byte[] outputBytes2 = data2.getBytes("UTF-8");      out.writeInt(outputBytes2.length); // write header      out.write(outputBytes2); // write body      out.flush();    } finally {      // 关闭连接      out.close();      socket.close();    }  }}

MINA服务器输出结果:

sessionCreated

messageReceived:牛顿

messageReceived:爱因斯坦

sessionClosed

Netty服务器输出结果:

channelActive

channelRead:牛顿

channelRead:爱因斯坦

channelInactive

Twisted服务器输出结果:

connectionMade

stringReceived:牛顿

stringReceived:爱因斯坦

connectionLost

感谢你能够认真阅读完这篇文章,希望小编分享的“怎么用MINA、Netty、Twisted来实现消息分割”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:http://blog.itpub.net/31558358/viewspace-2375357/

AI

开发者交流群×