原文由 Trisha Gee 在2019年12月13日发布在 INTELLIJ IDEA BLOG
在这一节,我们添加一个RSocket客户端,用来连接上一节创建的RSocket服务器。
现在,我们有了一个使用Spring WebClient的端到端应用程序。在上一节中,我们介绍了一个新的RSocket服务器,在这节,我们将看到如何创建一个客户端来连接它
创建一个集成测试
与WebClientStockClient
一样,我们将通过集成测试来驱动RSocket客户端,测试看起来与WebClientStockClientIntegrationTest
几乎相同。
- 所以让我们复制这个测试并将其重命名为
RSocketStockClientIntegrationTest
。 - 将变量
WebClientStockClient
改为RSocketStockClient
并重命名为rSocketStockClient
。 - (提示:使用IntelliJ IDEA的rename refactoring来重命名,会将其余地方用到的这个变量都重命名了,不需要查找和替换)。
- 我们知道这不需要WebClient,因为
WebClientStockClient
才需要的。移除构造函数参数和字段声明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RSocketStockClientIntegrationTest {
@Test
void shouldRetrieveStockPricesFromTheService() {
// given
RSocketStockClient rSocketStockClient = new RSocketStockClient();
// when
Flux<StockPrice> prices = rSocketStockClient.pricesFor("SYMBOL");
// then
Assertions.assertNotNull(prices);
Flux<StockPrice> fivePrices = prices.take(5);
Assertions.assertEquals(5, fivePrices.count().block());
Assertions.assertEquals("SYMBOL", fivePrices.blockFirst().getSymbol());
}
}
(注意:这代码现在还未能通过编译)
创建 RSocket 客户端
RSocketStockClient
还不存在,让我们创建一个空类。- (提示:在红色的
RSocketStockClient
代码按下 Alt+Enter会给我们选择创建这个类) - 测试假定要一个
pricesFor
方法,所以在RSocketStockClient
里面创建这个缺少的方法。 - (提示:在
RSocketStockClient
里红色的pricesFor
方法按下Alt+Enter会给我们创建这个方法的选择,并且有正确的方法签名。)
1
2
3
4
5
public class RSocketStockClient {
public Flux<StockPrice> pricesFor(String symbol) {
return null;
}
}
引入 StockClient 接口
当然方法声明看起来跟在WebClientStockClient
里面的一样,所以这是引入接口的好时机,让两个客户端都实现同样的接口。
- 创建一个接口
StockClient
我们希望pricesFor
方法出现在接口上,因为这个方法在两个客户端类的方法签名一样的。 - (提示:在
WebClientStockClient
上使用IntelliJ IDEA的Extract Interface 功能,可以自动地创建一个带有pricesFor
方法的接口。)
1
2
3
public interface StockClient {
Flux<StockPrice> pricesFor(String symbol);
}
- 确保
WebClientStockClient
已经更新为实现新的StockClient
接口了,并且添加了@Override
注解到pricesFor
方法上。
1
2
3
4
5
6
7
8
public class WebClientStockClient implements StockClient {
// 这里进行初始化...
@Override
public Flux<StockPrice> pricesFor(String symbol) {
// 这里是实现
}
}
- 对
RSocketStockClient
也是同样的操作
1
2
3
4
5
6
public class RSocketStockClient implements StockClient {
@Override
public Flux<StockPrice> pricesFor(String symbol) {
return null;
}
}
- 目前测试能通过编译,运行它看到不能通过测试。它失败的原因应该是在
assertNotNull
断言上,因为我们从pricesFor
方法返回null。
实现RSocket链接
通常 在测试驱动开发中,我们会采取一些小步骤来使测试通过,然后再有更详细的测试。在本课程中我们将直接进入并实现能用的RSocket客户端。
- 在stock-client模块添加一个spring-boot-starter-rsocket依赖到pom.xml文件
1
2
3
4
5
6
7
8
9
10
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<!-- more dependencies... -->
- 添加一个类型为
RSocketRequester
的字段rSocketRequester
到RSocketStockClient
。 - 为它添加一个构造函数参数
- (提示:IntelliJ IDEA可以为字段自动生构造函数参数)
1
2
3
4
5
6
7
8
9
public class RSocketStockClient implements StockClient {
private RSocketRequester rSocketRequester;
public RSocketStockClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
// pricesFor 方法...
}
- 在
pricesFor
方法,调用rSocketRequester.route
。 对于路由,我们想要使用跟在后端RSocket服务定义的相同,在我们的例子中是“stockPrices”。 - 通过
data
方法向服务器发送一个股票代号。 - 我们期望调用返回一个股票价格的
Flux
,所以将StockPrice.class
传入到retrieveFlux
方法。 - 返回这些调用
pricesFor
方法的结果,而不是null
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RSocketStockClient implements StockClient {
private RSocketRequester rSocketRequester;
public RSocketStockClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@Override
public Flux<StockPrice> pricesFor(String symbol) {
return rSocketRequester.route("stockPrices")
.data(symbol)
.retrieveFlux(StockPrice.class);
}
}
创建一个 RSocketRequester
测试代码不能通过编译,因为我们添加了一个rSocketRequester
到构造函数参数,然而在测试里我们没有RSocketRequester
实例。
- 在测试里创建一个名为
createRSocketRequester
的私有方法,放在上方靠近其它对象初始化的位置。 - 为
RSocketRequester.Builder
创建一个字段。如果我们添加@Autowired
注解,Spring将会为我们的测试注入一个实例。 - 要让Spring管理我们的测试,我们需要将它注解为
@SpringBootTest
。 - 在
createRSocketRequester
里面,使用rSocketRequester
通过TCP连接到我们的RSocket服务器,它运行在localhost的7000端口。 - 调用
block
直到连接上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
class RSocketStockClientIntegrationTest {
@Autowired
private RSocketRequester.Builder builder;
private RSocketRequester createRSocketRequester() {
return builder.connectTcp("localhost", 7000).block();
}
@Test
void shouldRetrieveStockPricesFromTheService() {
// implementation...
}
}
通过集成测试
我们期待这个测试能运行,但我们错过了一些重要的东西。我们发现个错误,说missing a SpringBootConfiguration看起来可能有点令人费解。实际上,这个模块并没有任何SpringBootApplication。因为这是用于作为一个库,给其它应用代码共享代码的,它本身不是一个应用程序。让我们看一种解决方案让测试跑起来。
- 在test目录里创建一个类
TestApplication
。 - 将它注解为
@SpringBootApplication
。 - 重新运行集成测试,所有东西应该按预期启动,并且测试应该通过了。
1
2
3
@SpringBootApplication
public class TestApplication {
}
使用 StepVerifier 进行测试
一旦测试通过了,我们可以假设客户端已经成功地通过RSocket连接到了服务器,获取一个Flux
的StockPrice
对象,可以去其中的前五个,然后检查第一个是否有正确的股票代码。这是稍微简单的测试响应式应用程序的方式。还有其它的方式,其中一种就是使用StepVerifier使用这种方式,我们可以编写我们期望看到的事件。
- 创建一个新的StepVerifier从prices Flux里面取5个价格。
- 使用expectNextMatches检查所有5个股票价格对应的股票代码是正确的。
- 调用去检查不仅这些期望达到了,并且在这5个之后没有更多的
StockPrice
对象了。 - 删除旧的断言(StepVerifier将它们全部替换了)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
void shouldRetrieveStockPricesFromTheService() {
// given
RSocketStockClient rSocketStockClient = new RSocketStockClient(createRSocketRequester());
// when
Flux<StockPrice> prices = rSocketStockClient.pricesFor("SYMBOL");
// then
StepVerifier.create(prices.take(5))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.expectNextMatches(stockPrice -> stockPrice.getSymbol().equals("SYMBOL"))
.verifyComplete();
}
This approach can support much more than this simple example, and is also very useful for testing time-based publishers like ours. 这种方式可以支持比这个简单例子更多的操作,而且对于测试像我们这种基于时间的发布者很有用。
添加重试退避已经错误处理策略
我们还有最后一件事要考虑。我们的WebClientStockClient
定义了一个退避重试策略,以及简单的错误处理方法。实际上我们对于RSocketStockClient
也采取同样的方式。
- 从
WebClientStockClient
复制retryBackoff
和doOnError
步骤并粘贴到RSocketStockClient.pricesFor
里面。 - 重新运行测试,它应该还能通过的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Log4j2
public class RSocketStockClient implements StockClient {
private RSocketRequester rSocketRequester;
public RSocketStockClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@Override
public Flux<StockPrice> pricesFor(String symbol) {
return rSocketRequester.route("stockPrices")
.data(symbol)
.retrieveFlux(StockPrice.class)
.retryBackoff(5, Duration.ofSeconds(1), Duration.ofSeconds(20))
.doOnError(IOException.class, e -> log.error(e.getMessage()));
}
}
现在我们在后端有了发送股票价格的RSocket服务器,以及能够连接到它并查看价格的RSocket客户端。在下一节,我们会看一下如何从使用WebClientStockClient
切换到RSocketStockClient
。
全部代码在 GitHub:https://github.com/zwt-io/rsb/