暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

响应式编程(上):总览

未来技术站 2017-11-14
176


作者 | Emac

杏仁医生架构师兼平台组负责人,关注微服务、DevOps领域。


引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM 的研究称,整个人类文明所获得的全部数据中,有 90% 是过去两年内产生的。在此背景下,包括 NoSQL,HadoopSparkStormKylin 在内的大批新技术应运而生。其中以 RxJava 和 Reactor 为代表的响应式(Reactive)编程技术针对的就是经典的大数据 4V 定义(Volume,Variety,Velocity,Value)中的 Velocity,即高并发问题,而在刚刚发布的 Spring 5 中,也引入了响应式编程的支持。我将分上下两篇与你分享与响应式编程有关的一些学习心得。本篇是上篇,以 Reactor 框架为例介绍响应式编程的几个关键特性。

1. 响应式宣言

和敏捷宣言一样,说起响应式编程,必先提到响应式宣言。

We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto


图片出处:The Reactive Manifesto


不知道是不是为了向敏捷宣言致敬,响应式宣言中也包含了 4 组关键词:

  • Responsive: 可响应的。要求系统尽可能做到在任何时候都能及时响应。

  • Resilient: 可恢复的。要求系统即使出错了,也能保持可响应性。

  • Elastic: 可伸缩的。要求系统在各种负载下都能保持可响应性。

  • Message Driven: 消息驱动的。要求系统通过异步消息连接各个组件。


可以看到,对于任何一个响应式系统,首先要保证的就是可响应性,否则就称不上是响应式系统。从这个意义上来说,动不动就蓝屏的 Windows 系统显然不是一个响应式系统。

PS: 如果你赞同响应式宣言,不妨到官网上留下的你电子签名,我的编号是 18989,试试看能不能找到我。

2. 响应式编程

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia


在上述响应式编程(后面简称 RP)的定义中,除了异步编程,还包含两个重要的关键词:

  • Data streams:即数据流,分为静态数据流(比如数组,文件)和动态数据流(比如事件流,日志流)两种。基于数据流模型,RP 得以提供一套统一的 Stream 风格的数据处理接口。和 Java 8 中的 Stream API 相比,RP API 除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。

  • The propagation of change:变化传播,简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数,将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。


一个容易混淆的概念是响应式设计,虽然它的名字中也包含了“响应式”三个字,但其实和 RP 完全是两码事。响应式设计是指网页能够自动调整布局和样式以适配不同尺寸的屏幕,属于网站设计的范畴,而 RP 是一种关注系统可响应性,面向数据流的编程思想或者说编程框架。

特性

从本质上说,RP 是一种异步编程框架,和其他框架相比,RP 至少包含了以下三个特性:

  • 描述而非执行:在你最终调用 subscribe()
     方法之前,从发布端到订阅端,没有任何事会发生。就好比无论多长的水管,只要水龙头不打开,水管里的水就不会流动。为了提高描述能力,RP 提供了比 Stream 丰富的多的多的API,比如 buffer()
    , merge()
    , onErrorMap()
     等。

  • 提高吞吐量: 类似于 HTTP/2 中的连接复用,RP 通过线程复用来提高吞吐量。在传统的Servlet容器中,每来一个请求就会发起一个线程进行处理。受限于机器硬件资源,单台服务器所能支撑的线程数是存在一个上限的,假设为T,那么应用同时能处理的请求数(吞吐量)必然也不会超过T。但对于一个使用 Spring 5 开发的 RP 应用,如果运行在像 Netty 这样的异步容器中,无论有多少个请求,用于处理请求的线程数是相对固定的,因此最大吞吐量就有可能超过T。

  • 背压(Backpressure)支持:简单来说,背压就是一种反馈机制。在一般的 Push 模型中,发布者既不知道也不关心订阅者的处理速度,当数据的发布速度超过处理速度时,需要订阅者自己决定是缓存还是丢弃。如果使用 RP,决定权就交回给发布者,订阅者只需要根据自己的处理能力问发布者请求相应数量的数据。你可能会问这不就是 Pull 模型吗?其实是不同的。在 Pull 模型中,订阅者每次处理完数据,都要重新发起一次请求拉取新的数据,而使用背压,订阅者只需要发起一次请求,就能连续不断的重复请求数据。

适用场景

了解了 RP 的这些特性,你可能已经猜想到 RP 有哪些适用场景了。一般来说,RP 适用于高并发、带延迟操作的场景,比如以下这些情况(的组合):

  • 一次请求涉及多次外部服务调用

  • 非可靠的网络传输

  • 高并发下的消息处理

  • 弹性计算网络

代价

Every coin has two sides.

和任何框架一样,有优势必然就有劣势。RP 的两个比较大的问题是:

  • 虽然复用线程有助于提高吞吐量,但一旦在某个回调函数中线程被卡住,那么这个线程上所有的请求都会被阻塞,最严重的情况,整个应用会被拖垮。

  • 难以调试。由于 RP 强大的描述能力,在一个典型的 RP 应用中,大部分代码都是以链式表达式的形式出现,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe()
    ,一旦出错,你将很难定位到具体是哪个环节出了问题。所幸的是,RP 框架一般都会提供一些工具方法来辅助进行调试。

3. Reactor 实战

为了帮助你理解上面说的一些概念,下面我就通过几个测试用例,演示 RP 的两个关键特性:提高吞吐量和背压。完整的代码可参见我 GitHub 上的示例工程。

提高吞吐量

    @Test
   public void testImperative() throws InterruptedException { _runInParallel(CONCURRENT_SIZE, () -> { ImperativeRestaurantRepository.INSTANCE.insert(load); }); } private void _runInParallel(int nThreads, Runnable task) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(nThreads); for (int i = 0; i < nThreads; i++) { executorService.submit(task); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.MINUTES); } @Test public void testReactive() throws InterruptedException { CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE); for (int i = 0; i < CONCURRENT_SIZE; i++) { ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> { }, e -> latch.countDown(), latch::countDown); } latch.await(); }


用例解读:

  • 第一个测试用例使用的是多线程 + MongoDB Driver,同时起 100 个线程,每个线程往 MongoDB 中插入 10000 条数据,总共 100 万条数据,平均用时15秒左右。

  • 第二个测试用例使用的是 Reactor + MongoDB Reactive Streams Driver,同样是插入 100 万条数据,平均用时不到 10 秒,吞吐量提高了 50%!

背压

在演示测试用例之前,先看两张图,帮助你更形象的理解什么是背压。



图片出处:Dataflow and simplified reactive programming


两张图乍一看没啥区别,但其实是完全两种不同的背压策略。第一张图,发布速度(100/s)远大于订阅速度(1/s),但由于背压的关系,发布者严格按照订阅者的请求数量发送数据。第二张图,发布速度(1/s)小于订阅速度(100/s),当订阅者请求100个数据时,发布者会积满所需个数的数据再开始发送。可以看到,通过背压机制,发布者可以根据各个订阅者的能力动态调整发布速度。


    @BeforeEach
   public void beforeEach() { // initialize publisher AtomicInteger count = new AtomicInteger(); timerPublisher = Flux.create(s -> new Timer().schedule(new TimerTask() { @Override public void run() { s.next(count.getAndIncrement()); if (count.get() == 10) { s.complete(); } } }, 100, 100) ); } @Test public void testNormal() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); timerPublisher .subscribe(r -> System.out.println("Continuous consuming " + r), e -> latch.countDown(), latch::countDown); latch.await(); } @Test public void testBackpressure() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicReference<Subscription> timerSubscription = new AtomicReference<>(); Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { timerSubscription.set(subscription); } @Override protected void hookOnNext(Integer value) { System.out.println("consuming " + value); } @Override protected void hookOnComplete() { latch.countDown(); } @Override protected void hookOnError(Throwable throwable) { latch.countDown(); } }; timerPublisher.onBackpressureDrop().subscribe(subscriber); new Timer().schedule(new TimerTask() { @Override public void run() { timerSubscription.get().request(1); } }, 100, 200); latch.await(); }


用例解读:

  • 第一个测试用例演示了在理想情况下,即订阅者的处理速度能够跟上发布者的发布速度(以 100ms 为间隔产生 10 个数字),控制台从 0 打印到 9,一共 10 个数字,和发布端一致。

  • 第二个测试用例故意调慢了订阅者的处理速度(每 200ms 处理一个数字),同时发布者采用了 Drop 的背压策略,结果控制台只打印了一半的数字(0,2,4,6,8),另外一半的数字由于背压的原因被发布者 Drop 掉了,并没有发给订阅者。

4 小结

通过上面的介绍,不难看出 RP 实际上是一种内置了发布者订阅者模型的异步编程框架,包含了线程复用,背压等高级特性,特别适用于高并发、有延迟的场景。

下篇我将对刚刚发布的 Spring 5 中有关响应式编程的支持做一些简单介绍,并详解一个完整的 Spring 5 示例应用,敬请期待。

5 参考

  • Understanding Reactive types:https://spring.io/blog/2016/04/19/understanding-reactive-types

  • Designing, Implementing, and Using Reactive APIs:https://www.slideshare.net/SpringCentral/designing-implementing-and-using-reactive-apis

  • Imperative to Reactive Web Applications:https://www.slideshare.net/SpringCentral/imperative-to-reactive-web-applications


全文完



以下文章您可能也会感兴趣:

  • 乐高式微服务化改造(上)

  • 乐高式微服务化改造(下)

  • 一个创业公司的容器化之路(一) - 容器化之前

  • 一个创业公司的容器化之路(二) - 容器化

  • 一个创业公司的容器化之路(三) - 容器即未来

  • 四维阅读法 - 我的高效学习“秘技”

  • 工程师成长的必备技能

  • iOS 屏幕适配浅谈

  • Web 与 App 数据交互原理和实现


我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。



杏仁技术站


长按左侧二维码关注我们,这里有一群热血青年期待着与您相会。




数据库
文章转载自 未来技术站,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

PHP网站源码光明百度网站优化松岗网站改版福田网站建设盐田百搜词包民治建站沙井百度爱采购观澜如何制作网站大浪外贸网站设计沙井网站优化按天扣费塘坑品牌网站设计横岗网站开发永湖高端网站设计龙华推广网站坪地关键词排名包年推广南联至尊标王光明外贸网站建设罗湖网站制作松岗网站推广工具塘坑网站推广系统西乡网站建设丹竹头网站推广系统盐田seo网站优化大芬网站优化按天扣费丹竹头建网站东莞网络推广龙华seo优化福田优秀网站设计观澜模板制作南澳百姓网标王推广双龙标王歼20紧急升空逼退外机英媒称团队夜以继日筹划王妃复出草木蔓发 春山在望成都发生巨响 当地回应60岁老人炒菠菜未焯水致肾病恶化男子涉嫌走私被判11年却一天牢没坐劳斯莱斯右转逼停直行车网传落水者说“没让你救”系谣言广东通报13岁男孩性侵女童不予立案贵州小伙回应在美国卖三蹦子火了淀粉肠小王子日销售额涨超10倍有个姐真把千机伞做出来了近3万元金手镯仅含足金十克呼北高速交通事故已致14人死亡杨洋拄拐现身医院国产伟哥去年销售近13亿男子给前妻转账 现任妻子起诉要回新基金只募集到26元还是员工自购男孩疑遭霸凌 家长讨说法被踢出群充个话费竟沦为间接洗钱工具新的一天从800个哈欠开始单亲妈妈陷入热恋 14岁儿子报警#春分立蛋大挑战#中国投资客涌入日本东京买房两大学生合买彩票中奖一人不认账新加坡主帅:唯一目标击败中国队月嫂回应掌掴婴儿是在赶虫子19岁小伙救下5人后溺亡 多方发声清明节放假3天调休1天张家界的山上“长”满了韩国人?开封王婆为何火了主播靠辱骂母亲走红被批捕封号代拍被何赛飞拿着魔杖追着打阿根廷将发行1万与2万面值的纸币库克现身上海为江西彩礼“减负”的“试婚人”因自嘲式简历走红的教授更新简介殡仪馆花卉高于市场价3倍还重复用网友称在豆瓣酱里吃出老鼠头315晚会后胖东来又人满为患了网友建议重庆地铁不准乘客携带菜筐特朗普谈“凯特王妃P图照”罗斯否认插足凯特王妃婚姻青海通报栏杆断裂小学生跌落住进ICU恒大被罚41.75亿到底怎么缴湖南一县政协主席疑涉刑案被控制茶百道就改标签日期致歉王树国3次鞠躬告别西交大师生张立群任西安交通大学校长杨倩无缘巴黎奥运

PHP网站源码 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化