一、前言

《Spring Boot 2.0 WebFlux编程》 一节我们大致了解了WebFlux的用法,这节我们将结合Mongo DB在WebFlux的架构下实现增删改查样例。和 《Spring Boot整合Mongo DB》 不同的是,我们使用的是Reactive Mongo DB依赖,所有增删改查方法返回值类型为Flux或者Mono。

二、项目准备

新建一个Spring Boot项目

2.1 引入webflux和reactive mongodb依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
        <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

2.2 开启Reactive Mongo DB

要开启Reactive Mongo DB的相关配置,需要在Spring Boot启动类上添加@EnableReactiveMongoRepositories注解:

1
2
3
4
5
6
7
8
9
@EnableReactiveMongoRepositories
@SpringBootApplication
public class WebFluxCrudApplication {

public static void main(String[] args) {
SpringApplication.run(WebFluxCrudApplication.class, args);
}

}

2.3 配置application.yml

接着在配置文件application.yml里配置Mongo DB连接:

1
2
3
4
5
6
spring:
data:
mongodb:
host: localhost
port: 27017
database: webflux

使用的是webflux数据库,所以需要在Mongo DB里新建一个webflux数据库(并创建user文档/表,以供待会使用):

三、简单增删改查

3.1 创建实体

创建User实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Setter
@Getter
@Document(collection = "user")
public class User {

@Id
private String id;

private String name;

private Integer age;

private String description;
}

3.2 UserDao

创建UserDao接口,继承自ReactiveMongoRepository:

1
2
3
@Repository
public interface UserDao extends ReactiveMongoRepository<User, String> {
}

《Spring Boot整合Mongo DB》 不同的是,我们继承的是ReactiveMongoRepository而非MongoRepository,它所提供的方法都是响应式的:

3.2 UserService

在UserService里通过UserDao定义简单增删改查方法:
UserService.java

1
2
3
4
5
6
7
8
9
public interface UserService {
Flux<User> getUsers();
Mono<User> getUser(String id);
Mono<User> createUser(User user);
Mono<Void> deleteUser(String id);
Mono<User> updateUser(String id, User user);
Flux<User> getUserByCondition(int size, int page, User user);
Mono<Long> getUserByConditionCount(User user);
}

实现类:
UserServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class UserServiceImpl implements UserService {

@Autowired
private UserDao userDao;

@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;

@Override
public Flux<User> getUsers() {
return userDao.findAll();
}

@Override
public Mono<User> getUser(String id) {
return this.userDao.findById(id);
}

@Override
public Mono<User> createUser(User user) {
return userDao.save(user);
}

@Override
public Mono<Void> deleteUser(String id) {
return this.userDao.findById(id)
.flatMap(user -> this.userDao.delete(user));
}

@Override
public Mono<User> updateUser(String id, User user) {
return this.userDao.findById(id)
.flatMap(u -> {
u.setName(user.getName());
u.setAge(user.getAge());
u.setDescription(user.getDescription());
return this.userDao.save(u);
});
}
}

大致上和 Spring Boot整合Mongo DB 中的UserService差不多,不同的是返回值类型为Flux或者Mono,即它们是响应式非阻塞的方法。

3.3 UserController

编写RESTfulUserController:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@RestController
@RequestMapping("user")
public class UserController {

@Autowired
private UserService userService;

/**
* 以数组的形式一次性返回所有数据
*/
@GetMapping
public Flux<User> getUsers() {
return userService.getUsers();
}

/**
* 以 Server sent events形式多次返回数据
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> getUsersStream() {
return userService.getUsers();
}

@PostMapping
public Mono<User> createUser(User user) {
return userService.createUser(user);
}

/**
* 存在返回 200,不存在返回 404
*/
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {
return userService.deleteUser(id)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}

/**
* 存在返回修改后的 User
* 不存在返回 404
*/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable String id, User user) {
return userService.updateUser(id, user)
.map(u -> new ResponseEntity<>(u, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}

/**
* 根据用户 id查找
* 存在返回,不存在返回 404
*/
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.getUser(id)
.map(user -> new ResponseEntity<>(user, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}

对于返回值为Flux类型的方法,推荐定义两个一样的方法,一个以普通形式返回,一个以Server Sent Event的形式返回。对于修改和删除,如果需要修改和删除的用户不存在,我们返回404。

对于Flux和Mono的操作,在 《Spring Boot 2.0 WebFlux编程》 一节中已经介绍过了,这里就不再赘述了。

3.4 测试

增加用户

查找用户

删除用户

四、排序与分页

4.1 Service代码实现

在 Spring Boot整合Mongo DB 一节中,我们通过MongoTemplate实现了排序与分页。与MongoTemplate对于的响应式的对象为ReactiveMongoTemplate,所以我们照葫芦画瓢,仿照MongoTemplate的写法来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    @Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;

/**
* 分页查询,只返回分页后的数据,count值需要通过 getUserByConditionCount
* 方法获取
*/
@Override
public Flux<User> getUserByCondition(int size, int page, User user) {
Query query = getQuery(user);
Sort sort = Sort.by("age");

Pageable pageable = PageRequest.of(page, size, sort);

return reactiveMongoTemplate.find(query.with(pageable), User.class);
}


/**
* 返回 count,配合 getUserByCondition使用
*/
@Override
public Mono<Long> getUserByConditionCount(User user) {
Query query = getQuery(user);
return reactiveMongoTemplate.count(query, User.class);
}

private Query getQuery(User user) {
Query query = new Query();
Criteria criteria = new Criteria();

if (!StringUtils.isEmpty(user.getName())) {
criteria.and("name").is(user.getName());
}
if (!StringUtils.isEmpty(user.getDescription())) {
criteria.and("description").regex(user.getDescription());
}
query.addCriteria(criteria);
return query;
}

之所以拆分是因为没找到与PageableExecutionUtils类的getPage方法类似的方法,如果是响应式的话,返回值类型应该是Mono<Page>,

4.2 Controller改造

新增以下方法

1
2
3
4
    @GetMapping("/condition")
public Flux<User> getUserByCondition(int size, int page, User user) {
return userService.getUserByCondition(size, page, user);
}

4.3 测试