Spring-WebFlux

Spring-WebFlux 相关笔记

Reactive Streams、Reactor 和 Web Flux关系

Reactive Streams 是规范 Reactor 实现了 Reactive Streams Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

使用 Spring 5 的 WebFlux 开发反应式 Web 应用 https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html

Spring Reactor 入门与实践 https://www.jianshu.com/p/7ee89f70dfe5

Project Reactor https://projectreactor.io/ https://github.com/reactor


WebClient

block()/blockFirst()/blockLast() are blocking, which is not supported in thread

WebClient.post 发送 HTTP 请求报错:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-kqueue-3
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP POST "/actuator/route/taskAgent" [ExceptionHandlingWebHandler]
Stack trace:
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
        at reactor.core.publisher.Mono.block(Mono.java:1703)

有时候报错的是 epoll 线程
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-142

原因: 包了两层 mono,block 出错 去掉 block


Reactor Netty

Reactor Netty 是 Spring WebFlux 的底层 HTTP 客户端库,提供异步、非阻塞的 HTTP 请求处理能力。 Reactor Netty 是一个异步的、非阻塞的、基于 Netty 的响应式网络框架。其中,HttpClient 是 Reactor Netty 提供的用于发送 HTTP 请求的响应式客户端。

reactor.netty.http.client.HttpClient

GET 请求

@Test
public void testGet() {
    HttpClient client = HttpClient.create()
                                    .responseTimeout(Duration.ofSeconds(30));
    String resp = client.get()
                        .uri("https://echo.apifox.com/get?key1=value1&key2")
                        .responseContent()
                        .aggregate()
                        .asString()
                        .block();
    System.out.println(resp);
}

POST 请求

@Test
public void testPost() {
    HttpClient client = HttpClient.create()
                                    .responseTimeout(Duration.ofSeconds(30));
    String body = JsonMappers.Normal.toJson(Map.of("k1", "v1", "k2", "v2"));
    String resp = client.post()
                        .uri("https://echo.apifox.com/post")
                        .send(ByteBufFlux.fromString(Mono.just(body)))
                        .responseContent()
                        .aggregate()
                        .asString()
                        .block();
    System.out.println(resp);
}

compress(true) 自动解压缩

HttpClient.create().compress(true) 的作用: ​​自动协商压缩​​:

  • 发出 HTTP 请求时,自动添加请求头:Accept-Encoding: gzip, deflate,表示客户端支持 gzip/deflate 压缩格式(会优先尝试 brotli 如果服务器支持)
  • 如果响应头包含 Content-Encoding: gzip 等压缩标记,HttpClient 自动将压缩内容解压后再交给上层处理