本篇文章为大家展示了WebFlux定点推送以及全推送灵活websocket运用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
WebFlux 本身提供了对 WebSocket 协议的支持,处理 WebSocket 请求需要对应的 handler 实现 WebSocketHandler 接口,每一个 WebSocket 都有一个关联的 WebSocketSession,包含了建立请求时的握手信息 HandshakeInfo
,以及其它相关的信息。可以通过 session 的 receive()
方法来接收客户端的数据,通过 session 的 send()
方法向客户端发送数据。
下面是一个简单的 WebSocketHandler 示例:
@Component public class EchoHandler implements WebSocketHandler { public Mono<Void> handle(WebSocketSession session) { return session.send( session.receive().map( msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText()))); } }
有了 handler 之后,还需要让 WebFlux 知道哪些请求需要交给这个 handler 进行处理,因此要创建相应的 HandlerMapping。
在处理 HTTP 请求时,我们经常使用 WebFlux 中最简单的 handler 定义方式,即通过注解 @RequestMapping
将某个方法定义为处理特定路径请求的 handler。 但是这个注解是用于处理 HTTP 请求的,对于 WebSocket 请求而言,收到请求后还需要协议升级的过程,之后才是 handler 的执行,所以我们不能直接通过该注解定义请求映射,不过可以使用 SimpleUrlHandlerMapping 来添加映射。
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping(EchoHandler echoHandler) { final Map<String, WebSocketHandler> map = new HashMap<>(1); map.put("/echo", echoHandler); final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setOrder(Ordered.HIGHEST_PRECEDENCE); mapping.setUrlMap(map); return mapping; } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
这样就能够将发往 /echo
的 WebSocket 请求交给 EchoHandler 处理。
我们还要为 WebSocket 类型的 handler 创建对应的 WebSocketHandlerAdapter,以便让 DispatcherHandler 能够调用我们的 WebSocketHandler。
完成这三个步骤后,当一个 WebSocket 请求到达 WebFlux 时,首先由 DispatcherHandler 进行处理,它会根据已有的 HandlerMapping 找到这个 WebSocket 请求对应的 handler,接着发现该 handler 实现了 WebSocketHandler 接口,于是会通过 WebSocketHandlerAdapter 来完成该 handler 的调用。
从上面的例子不难看出,没接收一个请求后,就得在里面里面返回消息,后面就不能再给他发消息了。其次是我每次新添加或者删除一个消息的处理类Handler,就得每次去修改配置文件中的SimpleUrlHandlerMapping的UrlMap的内容,感觉不是很友好。于是针对这2点进行修改和调整如下:
我们能否像注册 HTTP 请求的 Handler 那样,也通过类似 RequestMapping 的注解来注册 Handler 呢?
虽然官方没有相关实现,但我们可以自己实现一个类似的注解,不妨叫作 WebSocketMapping
:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface WebSocketMapping { String value() default ""; }
@Retention(RetentionPolicy.RUNTIME)
表明该注解工作在运行期间,@Target(ElementType.TYPE)
表明该注解作用在类上。
我们先看下该注解最终的使用方式。下面是一个 TimeHandler 的示例,它会每秒钟会向客户端发送一次时间。我们通过注解 @WebSocketMapping("/time")
完成了 TimeHandler 的注册,告诉 WebFlux 当有 WebSocket 请求发往 /echo
路径时,就交给 EchoHandler 处理:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Override public Mono<Void> handle(final WebSocketSession session) { return session.send( session.receive() .map(msg -> session.textMessage( "服务端返回:小明, -> " + msg.getPayloadAsText()))); } }
是不是和 RequestMapping 一样方便?
到目前为止,这个注解还没有实际的功能,还不能自动注册 handler。回顾我们上面注册路由的方式,我们创建了一个 SimpleUrlHandlerMapping,并手动添加了 EchoHandler 的映射规则,然后将其作为 HandlerMapping 的 Bean 返回。
现在我们要创建一个专门的 HandlerMapping 类来处理 WebSocketMapping 注解,自动完成 handler 的注册:
public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping{ private Map<String, WebSocketHandler> handlerMap = new LinkedHashMap<>(); /** * Register WebSocket handlers annotated by @WebSocketMapping * @throws BeansException */ @Override public void initApplicationContext() throws BeansException { Map<String, Object> beanMap = obtainApplicationContext() .getBeansWithAnnotation(WebSocketMapping.class); beanMap.values().forEach(bean -> { if (!(bean instanceof WebSocketHandler)) { throw new RuntimeException( String.format("Controller [%s] doesn't implement WebSocketHandler interface.", bean.getClass().getName())); } WebSocketMapping annotation = AnnotationUtils.getAnnotation( bean.getClass(), WebSocketMapping.class); //webSocketMapping 映射到管理中 handlerMap.put(Objects.requireNonNull(annotation).value(),(WebSocketHandler) bean); }); super.setOrder(Ordered.HIGHEST_PRECEDENCE); super.setUrlMap(handlerMap); super.initApplicationContext(); } }
我们的 WebSocketMappingHandlerMapping 类,实际上就是 SimpleUrlHandlerMapping,只不过增加了一些初始化的操作。
initApplicationContext()
方法是 Spring 中 ApplicationObjectSupport 类的方法,用于自定义类的初始化行为,在我们的 WebSocketMappingHandlerMapping 中,初始化工作主要是收集使用了 @WebSocketMapping
注解并且实现来 WebSocketHandler
接口的 Component,然后将它们注册到内部的 SimpleUrlHandlerMapping 中。之后的路由工作都是由父类 SimpleUrlHandlerMapping 已实现的功能来完成。
现在,我们只需要返回 WebSocketMappingHandlerMapping 的 Bean,就能自动处理 @WebSocketMapping
注解了:
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping() { return new WebSocketMappingHandlerMapping(); } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
我们来看下基于 Reactor Netty 的 WebFlux 具体是如何处理 WebSocket 请求的。
前面说过,WebSocket 请求进入 WebFlux 后,首先会从 HandlerMapping 中找到对应的 WebSocketHandler,再由 WebSocketHandlerAdapter 进行实际的调用。这就不再多做阐述,有兴趣的朋友可以去看看WebSocketHandler,WebSocketHandlerAdapter。
我们知道 HTTP 协议是半双工通信,虽然客户端和服务器都能给对方发数据,但是同一时间内只会由一方向另一方发送数据,并且在顺序上是客户端先发送请求,然后才由服务器返回响应数据。所以服务器处理 HTTP 的逻辑很简单,就是每接收到一个客户端请求,就返回一个响应。
而 WebSocket 是全双工通信,客户端和服务器可以随时向另一方发送数据,所以不再是"发送请求、返回响应"的通信方式了。我们上面的 EchoHandler 示例用的仍旧是这一方式,即收到数据后再针对性地返回一条数据,我们下面就来看看如何充分利用 WebSocket 的双向通信。
WebSocket 的处理,主要是通过 session 完成对两个数据流的操作,一个是客户端发给服务器的数据流,一个是服务器发给客户端的数据流:
WebSocketSession 方法 | 描述 |
---|---|
Flux<WebSocketMessage> receive() | 接收来自客户端的数据流,当连接关闭时数据流结束。 |
Mono<Void> send(Publisher<WebSocketMessage>) | 向客户端发送数据流,当数据流结束时,往客户端的写操作也会随之结束,此时返回的 Mono<Void> 会发出一个完成信号。 |
在 WebSocketHandler 中,最后应该将两个数据流的处理结果整合成一个信号流,并返回一个 Mono<Void>
用于表明处理是否结束。
我们分别为两个流定义处理的逻辑:
对于输出流:服务器每秒向客户端发送一个数字;
对于输入流:每当收到客户端消息时,就打印到标准输出
Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText) .map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink))));
这两个处理逻辑互相独立,它们之间没有先后关系,操作执行完之后都是返回一个 Mono<Void>
,但是如何将这两个操作的结果整合成一个信号流返回给 WebFlux 呢?我们可以使用 WebFlux 中的 Mono.zip()
方法:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @Override public Mono<Void> handle(WebSocketSession session) { Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink)))); /** * Mono.zip() 会将多个 Mono 合并为一个新的 Mono, * 任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono * 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。 */ return Mono.zip(input, output).then(); } }
这里所说的从外部发送数据,指的是需要在 WebSocketHandler 的代码范围之外,在其它地方通过代码调用的方式向 WebSocket 连接发送数据。
思路:在定义 session 的 send()
操作时,通过编程的方式创建 Flux,即使用 Flux.create()
方法创建,将发布 Flux 数据的 FluxSink
暴露出来,并进行保存,然后在需要发送数据的地方,调用 FluxSink<T>
的 next(T data)
方法,向 Flux 的订阅者发布数据。
create 方法是以编程方式创建 Flux 的高级形式,它允许每次产生多个数据,并且可以由多个线程产生。
create 方法将内部的 FluxSink 暴露出来,FluxSink 提供了 next、error、complete 方法。通过 create 方法,可以将响应式堆栈中的 API 与其它 API 进行连接。
考虑这么一个场景:服务器与客户端 A 建立 WebSocket 连接后,允许客户端 B 通过 HTTP 向客户端 A 发送数据。
不考虑安全性、鲁棒性等问题,我们给出一个简单的示例。
首先是 WebSocketHandler 的实现,客户端发送 WebSocket 建立请求时,需要在 query 参数中为当前连接指定一个 id,服务器会以该 id 为键,以对应的 WebSocketSender 为值存放到 senderMap 中:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @Override public Mono<Void> handle(WebSocketSession session) { // TODO Auto-generated method stub HandshakeInfo handshakeInfo = session.getHandshakeInfo(); Map<String, String> queryMap = getQueryMap(handshakeInfo.getUri().getQuery()); String id = queryMap.getOrDefault("id", "defaultId"); Mono<Void> input = session.receive().map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink)))); /** * Mono.zip() 会将多个 Mono 合并为一个新的 Mono,任何一个 Mono 产生 error 或 complete 都会导致合并后的 Mono * 也随之产生 error 或 complete,此时其它的 Mono 则会被执行取消操作。 */ return Mono.zip(input, output).then(); } //用于获取url参数 private Map<String, String> getQueryMap(String queryStr) { Map<String, String> queryMap = new HashMap<>(); if (!StringUtils.isEmpty(queryStr)) { String[] queryParam = queryStr.split("&"); Arrays.stream(queryParam).forEach(s -> { String[] kv = s.split("=", 2); String value = kv.length == 2 ? kv[1] : ""; queryMap.put(kv[0], value); }); } return queryMap; } }
其中,senderMap
是我们自己定义的 Bean,在配置文件中定义:
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping() { return new WebSocketMappingHandlerMapping(); } @Bean public ConcurrentHashMap<String, WebSocketSender> senderMap() { return new ConcurrentHashMap<String, WebSocketSender>(); } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
WebSocketSender
是我们自己创建的类,目的是保存 WebSocket 连接的 session 以及对应的 FluxSink,以便在 WebSocketHandler 代码范围外发送数据:
public class WebSocketSender { private WebSocketSession session; private FluxSink<WebSocketMessage> sink; public WebSocketSender(WebSocketSession session, FluxSink<WebSocketMessage> sink) { this.session = session; this.sink = sink; } public void sendData(String data) { sink.next(session.textMessage(data)); } }
接着我们来实现 HTTP Controller,用户在发起 HTTP 请求时,通过 query 参数指定要通信的 WebSocket 连接 id,以及要发送的数据,然后从 senderMap 中取出对应的 WebSocketSender,调用其 send()
方法向客户端发送数据:
@RestController @RequestMapping("/msg") public class MsgController { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @RequestMapping("/send") public String sendMessage(@RequestParam String id, @RequestParam String data) { WebSocketSender sender = senderMap.get(id); if (sender != null) { sender.sendData(data); return String.format("Message '%s' sent to connection: %s.", data, id); } else { return String.format("Connection of id '%s' doesn't exist", id); } } }
我这就不再写页面了,直接就用https://www.websocket.org/echo.html进行测试了,结果如下:
这样就算完成了定点推送了,全推送,和部分推送就不再写了,只要从ConcurrentHashMap中取出来去发送就是了。
上述内容就是WebFlux定点推送以及全推送灵活websocket运用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。