温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

redis流数据推送多用户的方法是什么

发布时间:2021-12-18 17:29:27 来源:亿速云 阅读:200 作者:iii 栏目:大数据

本篇内容主要讲解“redis流数据推送多用户的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redis流数据推送多用户的方法是什么”吧!

1 当用户几百 几千个时 如何推送?取缔线程池  采用单线程异步同步推送。

redis流数据推送多用户的方法是什么

2 现在的逻辑:

每次项目重新启动: 初始化channel 、服务端断开连接-重新连接。当有服务连接不上的时候定时器连接。

这样会产生一个问题:异步连接的任务特别多 导致服务奔溃 

/** * 关闭连接时 */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {  InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();  int port = ipSocket.getPort();  String host = ipSocket.getHostString();  String serverUrl = host + ":" + port;  String prev = redisTemplate.opsForValue().get(SOCKET_CONNECT_PREFIX + serverUrl);  Integer cur = Math.toIntExact(System.currentTimeMillis() / 1000);  if (null == prev || cur - Integer.parseInt(prev) > 20 * 60) {    redisTemplate.opsForValue().set(SOCKET_CONNECT_PREFIX + serverUrl, cur + "");    log.info("服务端断开连接=====" + host + port + "20分钟之后 重新连接");    final EventLoop eventLoop = ctx.channel().eventLoop();    Bootstrap bootstrap = defaultProcessHandler.getBootstrap(eventLoop);    eventLoop.schedule(        () -> defaultProcessHandler.doConnect(bootstrap, host + ":" + port, new AtomicInteger(0)),        20,        TimeUnit.MINUTES);  } else {    log.warn(serverUrl + "距离上次断开连接不足20分钟==" + (cur - Integer.parseInt(prev)) + "s");  }  super.channelInactive(ctx);}

在系统启动初始化 连接10次:

redis流数据推送多用户的方法是什么

3 如何保证发送数据的完整性  TCP 粘包问题

服务端添加按换行符分隔的解码器;

serverBootstrap.childHandler(new ChannelInitializer<Channel>() {  @Override  protected void initChannel(Channel channel) {    //此方法每次客户端连接都会调用,是为通道初始化的方法
   //获得通道channel中的管道链(执行链、handler链)    ChannelPipeline pipeline = channel.pipeline();    pipeline.addLast(new LineBasedFrameDecoder(Short.MAX_VALUE * 10));    pipeline.addLast(new StringDecoder());    pipeline.addLast(new StringHandler());
   log.info("success to initHandler!");  }});

 

4 关于推送服务断开之后 重新连接上来 接着上次发送的位置继续推送。
ChannelFuture future = channels.get(callbackUrl.getUrl());if (null != future) {  try {    boolean result = sendMsg(future, pushDataStr);    this.redisTemplate.opsForValue().set(callbackUrl.getUrl(), offset.toString());    if (!result) {      log.error("this channel push failed {}", callbackUrl.getUrl());      returnVal = false;    }  } catch (Exception e) {    log.error("push exception", e);    returnVal = false;  }}return returnVal;

当推送成功  会记录次用户 此次的数据游标数据到redis.

当推送服务挂断之后,会进行任务的初始化 此时会从redis中读取每个客户上次读取的位置offest 提交到任务线程池

这个问题同样解决了服务重启之后,依然可以从上次读取结束的位置接着读取。读取任务的开始游标位置 :是上次服务成功处理后的游标。

redis流数据推送多用户的方法是什么

redis流数据推送多用户的方法是什么

5 当接收数据的服务端 重新断开之后,如何保证接着上次读取的位置?

BaseReceiver 接口  handler 方法  每次接收到数据 返回当前游标值。当进行业务处理成功之后  返回true .会自动进行后续数据读取。当客户那边的接收数据服务端挂了之后 首先会进行自动重连操作,此时读取datahub数据的线程依然在返回数据 但是不能推送成功,所以游标值 不会后移。

到此,相信大家对“redis流数据推送多用户的方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI