• 学习webflux响应式前需要学习三个基础:
  1. 函数式编程和lambda表达式
  2. Stream流编程
  3. Reactive stream 响应式流
  4. 实战开发
  • 接下来进入学习系列四

Webflux响应式编程实战开发

先来一张图,这是spring文档的一张截图,介绍了spring如今的两种开发模式,MVC和webflux两种开发模式,可见webflux的重要性

20181117163117958.png

1. 初识SpringWebFlux

webflux 是spring5推出的一种响应式Web框架,它是一种非阻塞的开发模式,可以在一个线程里处理多个请求(非阻塞),运行在netty环境,也可以可以运行在servlet3.1之后的容器,支持异步servlet, 可以支持更高的并发量

2. 异步servlet

  • 我们知道同步servlet阻塞了Tomcat容器的线程,当一个网络请求到我们的Tomcat容器之后,容器会给每个请求启动一个线程去处理,线程里面会调用一个servlet去处理,当使用同步servlet时,业务代码花多长时间,你的线程就要等待多长时间,这就是堵塞(同步和异步是服务器后台才有异步这个概念,对于浏览器来说所有的请求都是异步,前台都要花费业务逻辑时间)
  • 异步servlet的主要作用是它不会堵塞Tomcat容器的servlet线程,它可以把一些耗时的操作放在一个独立的线程池,那么我们的servlet就可以立马返回,处理下一个请求,以此就可以达到高并发。
    通过代码比较一下同步servlet与异步servlet

同步servlet

@WebServlet(urlPatterns = "/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public SyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();
        // 执行业务代码
        doSomeThing(request, response);
        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(HttpServletRequest request,
                             HttpServletResponse response) throws IOException {
        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response.getWriter().append("done");
    }
}

异步servlet

@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public AsyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 开启异步
        AsyncContext asyncContext = request.startAsync();

        // 执行业务代码,放入一个线程池里
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(AsyncContext asyncContext,
                             ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 业务代码处理完毕, 通知结束
        asyncContext.complete();
    }
}

20181218173222926.png

  • 通过以上两段代码控制台的打印结果可以看出,异步servlet把耗时操作放在一个独立的线程池,那么我们的servlet就可以立马返回,处理下一个请求。

3. CRUD完整示例

  • 通过下图可以看出MVC和wenflux的区别
    20181117165349636.png

  • 以下通过一个例子了解一下webflux开发

  1. 实体类
@Document(collection = "user")
@Data
public class User {

	@Id
	private String id;

	@NotBlank
	private String name;

	@Range(min=10, max=100)
	private int age;

}
  1. Controller层
@RestController
@RequestMapping("/user")
public class UserController {

	private final UserRepository repository;

	public UserController(UserRepository repository) {
		this.repository = repository;
	}

	/**
	 * 以数组形式一次性返回数据
	 */
	@GetMapping("/")
	public Flux<User> getAll() {
		return repository.findAll();
	}

	/**
	 * 以SSE形式多次返回数据
	 */
	@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamGetAll() {
		return repository.findAll();
	}

	/**
	 * 新增数据
	 */
	@PostMapping("/")
	public Mono<User> createUser(@Valid @RequestBody User user) {
		// spring data jpa 里面, 新增和修改都是save. 有id是修改, id为空是新增
		// 根据实际情况是否置空id
		user.setId(null);
		CheckUtil.checkName(user.getName());
		return this.repository.save(user);
	}

	/**
	 * 根据id删除用户 存在的时候返回200, 不存在返回404
	 */
	@DeleteMapping("/{id}")
	public Mono<ResponseEntity<Void>> deleteUser(
			@PathVariable("id") String id) {
		// deletebyID 没有返回值, 不能判断数据是否存在
		// this.repository.deleteById(id)
		return this.repository.findById(id)
				// 当你要操作数据, 并返回一个Mono 这个时候使用flatMap
				// 如果不操作数据, 只是转换数据, 使用map
				.flatMap(user -> this.repository.delete(user).then(
						Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 修改数据 存在的时候返回200 和修改后的数据, 不存在的时候返回404
	 */
	@PutMapping("/{id}")
	public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
			@Valid @RequestBody User user) {
		CheckUtil.checkName(user.getName());
		return this.repository.findById(id)
				// flatMap 操作数据
				.flatMap(u -> {
					u.setAge(user.getAge());
					u.setName(user.getName());
					return this.repository.save(u);
				})
				// map: 转换数据
				.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 根据ID查找用户 存在返回用户信息, 不存在返回404
	 */
	@GetMapping("/{id}")
	public Mono<ResponseEntity<User>> findUserById(
			@PathVariable("id") String id) {
		return this.repository.findById(id)
				.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
				.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
	}

	/**
	 * 根据年龄查找用户
	 */
	@GetMapping("/age/{start}/{end}")
	public Flux<User> findByAge(@PathVariable("start") int start,
			@PathVariable("end") int end) {
		return this.repository.findByAgeBetween(start, end);
	}

	/**
	 * 根据年龄查找用户
	 */
	@GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamFindByAge(@PathVariable("start") int start,
			@PathVariable("end") int end) {
		return this.repository.findByAgeBetween(start, end);
	}
	
	/**
	 *  得到20-30用户
	 */
	@GetMapping("/old")

	public Flux<User> oldUser() {
		return this.repository.oldUser();
	}

	/**
	 * 得到20-30用户
	 */
	@GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public Flux<User> streamOldUser() {
		return this.repository.oldUser();
	}

}
  1. Repository层
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {

	/**
	 * 根据年龄查找用户
	 */
	Flux<User> findByAgeBetween(int start, int end);
	
	@Query("{'age':{ '$gte': 20, '$lte' : 30}}")
	Flux<User> oldUser();
}
  • 以上代码没有进行校验,当然没有校验的代码是不能用的,校验代码我就不放了,想了解的GitHub上有完整代码。

标题:(4)实战开发——Webflux响应式编程利器
作者:AlgerFan
地址:https://www.algerfan.cn/articles/2018/12/16/1544342714012.html
版权声明:本文为博主原创文章,转载请附上博文链接!

添加新评论