原创

SpringBoot Webflux Jpa CURD Demo

引入webflux

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
package com.weilai.webfluxdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class WebfluxDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebfluxDemoApplication.class, args);
    }

}

@Controller、@RequestMapping

entity

package com.weilai.webfluxdemo.domain;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/5 5:13 下午
 */
@Document(value = "user")
@Data
public class User {

    @Id
    private String id;

    private String name;

    private Integer age;

}

repository

package com.weilai.webfluxdemo.domain;

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/5 5:14 下午
 */
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {


    /**
     * 根据年龄查找用户
     *
     * @param start
     * @param end
     * @return
     */
    Flux<User> findByAgeBetween(int start, int end);

    /**
     * 得到20-30用户
     * @return
     */
    @Query("{'age':{ '$gte': 20, '$lte' : 30}}")
    Flux<User> oldUser();
}

controller控制器

package com.weilai.webfluxdemo.controller;

import com.weilai.webfluxdemo.domain.User;
import com.weilai.webfluxdemo.domain.UserRepository;
import com.weilai.webfluxdemo.util.CheckUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/5 5:16 下午
 */
@RestController
@RequestMapping("/users")
public class UserController {

    private final UserRepository userRepository;

    @Autowired
    public UserController(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    /**
     * 以数组形式一次性返回数据
     *
     * @return
     */
    @GetMapping("/")
    public Flux<User> getAll() {
        return userRepository.findAll();
    }

    /**
     * 以SSE形式多次返回数据
     *
     * @return
     */
    @GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamGetAll() {
        return userRepository.findAll();
    }

    /**
     * 新增数据
     *
     * @param user
     * @return
     */
    @PostMapping("/")
    public Mono<User> createUser(@Validated @RequestBody User user) {
        // spring data jpa 里面, 新增和修改都是save. 有id是修改, id为空是新增
        // 根据实际情况是否置空id
        user.setId(null);
        CheckUtil.checkName(user.getName());
        return userRepository.save(user);
    }

    /**
     * 根据id删除用户 存在的时候返回200, 不存在返回404
     *
     * @param id
     * @return
     */
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> deleteUser(
            @PathVariable("id") String id) {
        // deletebyID 没有返回值, 不能判断数据是否存在
        // this.repository.deleteById(id)
        return userRepository.findById(id)
                // 当你要操作数据, 并返回一个Mono 这个时候使用flatMap
                // 如果不操作数据, 只是转换数据, 使用map
                .flatMap(user -> userRepository.delete(user).then(
                        Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 修改数据 存在的时候返回200 和修改后的数据, 不存在的时候返回404
     *
     * @param id
     * @param user
     * @return
     */
    @PutMapping("/{id}")
    public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
                                                 @Validated @RequestBody User user) {
        CheckUtil.checkName(user.getName());
        return userRepository.findById(id)
                // flatMap 操作数据,返回mono
                .flatMap(u -> {
                    u.setAge(user.getAge());
                    u.setName(user.getName());
                    return userRepository.save(u);
                })
                // map: 转换数据,只返回数据
                .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 根据ID查找用户 存在返回用户信息, 不存在返回404
     *
     * @param id
     * @return
     */
    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> findUserById(
            @PathVariable("id") String id) {
        return userRepository.findById(id)
                .map(u -> new ResponseEntity<>(u, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 根据年龄查找用户
     *
     * @param start
     * @param end
     * @return
     */
    @GetMapping("/age/{start}/{end}")
    public Flux<User> findByAge(@PathVariable("start") int start,
                                @PathVariable("end") int end) {
        return userRepository.findByAgeBetween(start, end);
    }

    /**
     * 根据年龄查找用户
     *
     * @param start
     * @param end
     * @return
     */
    @GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamFindByAge(@PathVariable("start") int start,
                                      @PathVariable("end") int end) {
        return userRepository.findByAgeBetween(start, end);
    }

    /**
     *  得到20-30用户
     * @return
     */
    @GetMapping("/old")
    public Flux<User> oldUser() {
        return userRepository.oldUser();
    }

    /**
     * 得到20-30用户
     *
     * @return
     */
    @GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamOldUser() {
        return userRepository.oldUser();
    }
}

util参数验证工具类

package com.weilai.webfluxdemo.util;

import com.weilai.webfluxdemo.exceptions.CheckException;

import java.util.stream.Stream;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/6 11:46 上午
 */
public class CheckUtil {

    private static final String[] INVALID_NAMES = { "admin", "weilai" };

    /**
     * 校验名字, 不成功抛出校验异常
     *
     * @param value
     */
    public static void checkName(String value) {
        Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
                .findAny().ifPresent(name -> {
            throw new CheckException("name", value);
        });
    }
}

exception自定义异常

package com.weilai.webfluxdemo.exceptions;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/6 11:46 上午
 */
import lombok.Data;

@Data
public class CheckException extends RuntimeException {

    private static final long serialVersionUID = 1L;

    /**
     * 出错字段的名字
     */
    private String fieldName;

    /**
     * 出错字段的值
     */
    private String fieldValue;

    public CheckException() {
        super();
    }

    public CheckException(String message, Throwable cause,
                          boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
    }

    public CheckException(String message, Throwable cause) {
        super(message, cause);
    }

    public CheckException(String message) {
        super(message);
    }

    public CheckException(Throwable cause) {
        super(cause);
    }

    public CheckException(String fieldName, String fieldValue) {
        super();
        this.fieldName = fieldName;
        this.fieldValue = fieldValue;
    }


}

advice异常处理

package com.weilai.webfluxdemo.advice;

import com.weilai.webfluxdemo.exceptions.CheckException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.bind.support.WebExchangeBindException;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/6 12:15 下午
 */
@RestControllerAdvice
public class CheckAdvice {

    @ExceptionHandler(WebExchangeBindException.class)
    public ResponseEntity<String> handleBindException(
            WebExchangeBindException e) {
        return new ResponseEntity<>(toStr(e), HttpStatus.BAD_REQUEST);
    }

    @ExceptionHandler(CheckException.class)
    public ResponseEntity<String> handleCheckException(
            CheckException e) {
        return new ResponseEntity<>(toStr(e), HttpStatus.BAD_REQUEST);
    }

    private String toStr(CheckException e) {
        return e.getFieldName() + ": 错误的值 " + e.getFieldValue();
    }

    /**
     * 把校验异常转换为字符串
     *
     * @param ex
     * @return
     */
    private String toStr(WebExchangeBindException ex) {
        return ex.getFieldErrors().stream()
                .map(e -> e.getField() + ":" + e.getDefaultMessage())
                .reduce("", (s1, s2) -> s1 + "\n" + s2);
    }

}

Router Functions

HandlerFunction(输入ServerServerRequest返回ServerResponse)

HandlerFunction.java

@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
    Mono<T> handle(ServerRequest request);
}

UserHandler.java

package com.weilai.webfluxdemo.handler;

import com.weilai.webfluxdemo.domain.User;
import com.weilai.webfluxdemo.domain.UserRepository;
import com.weilai.webfluxdemo.util.CheckUtil;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/11 3:50 下午
 */
@Component
public class UserHandler {

    private final UserRepository userRepository;

    public UserHandler(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Mono<ServerResponse> hello(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromValue("Hello, Spring!"));
    }

    /**
     * 获取所有用户
     * @param serverRequest
     * @return
     */
    public Mono<ServerResponse> getAllUser(ServerRequest serverRequest) {
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(userRepository.findAll(), User.class);
    }

    /**
     * 创建用户
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> createUser(ServerRequest request) {
        // 相当与从RequestBody中获取User类,@RequestBody User user
        Mono<User> user = request.bodyToMono(User.class);
        return user.flatMap(u -> {
            // 校验代码需要放在这里
            CheckUtil.checkName(u.getName());
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                    .body(userRepository.save(u), User.class);
        });
    }

    /**
     * 根据id删除用户
     *
     * @param request
     * @return
     */
    public Mono<ServerResponse> deleteUserById(ServerRequest request) {
        // 获取pathVariable变量id
        String id = request.pathVariable("id");
        return userRepository.findById(id)
                .flatMap(user -> userRepository.delete(user).then(ServerResponse.ok().build()))
                .switchIfEmpty(ServerResponse.notFound().build());
    }
}

RouterFunction(请求URL与HandlerFunction建立关系)

UserRouter.java

package com.weilai.webfluxdemo.router;

import com.weilai.webfluxdemo.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

/**
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/11 4:00 下午
 */
@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> testRoute(UserHandler userHandler) {
        return RouterFunctions.route(RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), userHandler::hello);
    }

    @Bean
    public RouterFunction<ServerResponse> route(UserHandler userHandler) {
        return RouterFunctions.nest(
                // 相当于@RequestMapping("/user")
                RequestPredicates.path("/user"),
                // @GetMapping("/")
                RouterFunctions.route(RequestPredicates.GET("/"), userHandler::getAllUser)
                // @PostMapping(value = "/",produces = MediaType.APPLICATION_JSON_VALUE)
                .andRoute(RequestPredicates.POST("/").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::createUser)
                // @DeleteMapping("/{id}")
                .andRoute(RequestPredicates.DELETE("/{id}"), userHandler::deleteUserById)
        );
    }
}

异常处理

package com.weilai.webfluxdemo.handler;

import com.weilai.webfluxdemo.exceptions.CheckException;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import reactor.core.publisher.Mono;

/**
 * 统一异常返回处理
 * @author weilai
 * @email 352342845@qq.com
 * @date 2020/8/11 4:25 下午
 */
@Component
// 加载顺序小于-2才会生效
@Order(-2)
public class ExceptionHandler implements WebExceptionHandler {

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        // 设置响应头400
        response.setStatusCode(HttpStatus.BAD_REQUEST);
        // 设置返回类型
        response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

        // 异常信息
        String errorMsg = toStr(ex);

        DataBuffer db = response.bufferFactory().wrap(errorMsg.getBytes());

        return response.writeWith(Mono.just(db));
    }

    private String toStr(Throwable ex) {
        // 已知异常
        if (ex instanceof CheckException) {
            CheckException e = (CheckException) ex;
            return e.getFieldName() + ": invalid value " + e.getFieldValue();
        }
        // 未知异常, 需要打印堆栈, 方便定位
        else {
            ex.printStackTrace();
            return ex.toString();
        }

    }

}
正文到此结束
本文目录