用友iUAP:ReactiveX异步处理数据流
ReactiveX是(Rx,Reactive Extensions)是由微软开发的一种响应式编程模型,最初仅作为LINQ的一个扩展,并在2012年11月开源。作为一种响应式编程模型,Rx的主要作用是为开发者提供一套统一的编程接口,开发者使用LINQ风格的操作符对可观察序列的数据进行处理,从而完成异步数据和基于事件的程序编写。Rx已经支持包括.NET、JavaScript、C++、Java等几乎全部的流行编程语言。
微软自身对其定义是:Rx = Observables + LINQ + Schedulers,Observables用来表示异步数据流,LINQ操作符用来查询异步数据流, Schedulers用来参数化异步数据流的并发处理。Rx很好的结合了观察者模式、迭代器模式和函数式编程。
响应式编程就是与异步数据流交互的编程模式,任何事物都可以成为数据流,例如定义的变量、用户提交的表单、一个按钮的点击事件、缓存等等都是数据流,开发者可以观察数据流的变化并根据变化做出响应。例如,一个由微博订阅事件产生的数据流,开发者监听该数据流并在此基础上可以实现粉丝增加、订阅数统计、获取订阅内容等一系列的数据处理。
数据流可以看作一个按时间排序的时间序列,一般的数据流模型包括三种事件类型:普通事件(值事件)、错误事件、完成事件。观察者订阅该数据流之后,当普通事件发生时调用观察者处理普通事件的相关函数,如表单提交数据写入数据库等;当数据流发生任何错误时会抛出异常,该异常由观察者捕获并在错误处理函数中处理,这类似于try/catch方法;当整个数据流完成时,观察者会执行其数据流完成函数,如提供一些关闭窗口、断开数据库连接等功能。当然,有时候开发者可以忽略错误事件和完成事件而只需聚焦于如何定义和设计在发出值事件时要执行的函数。
Rx提供了一套对数据流进行处理(组合、过滤、转换等)的方法,一个数据流可以作为另一个数据流的输入,甚至多个数据流也可以作为另一个数据流的输入。可以合并两个数据流,也可以过滤一个数据流得到另一个只包含感兴趣的事件的数据流,还可以映射一个数据流的值到一个新的数据流里。所有这些都无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO等,Rx模型让开发者可以像使用集合数据一样操作异步事件流,对异步事件流使用各种简单、可组合的操作,大大降低了开发者处理异步数据的复杂度。
举例来讲,统计一个按钮的双击(或三击以上)次数,可以想象原始的代码实现一定会乱而复杂,需要一些变量来记录状态,并需要一些统计点击事件时间间隔的代码,而使用Rx这些功能的逻辑实现只需简单的4行代码就能实现。
如下图,首先把250ms以内的点击事件放在一个列表中(图中的buffer(clickstream.throttle(250ms))),然后在由map函数将上步转化的列表数据流映射成一个列表长度(即点击次数)的数据流,最后在经过长度大于等于2的条件过滤出最终想要的数据流。然后就可以订阅这个数据流完成响应了。
Rx的优势在于:数据流可组合,对于单层的异步操作来说,Java中的Future对象就可以提供简单而有效的处理方式,但是一旦涉及到嵌套,这种处理方式就开始变得异常繁琐和复杂。Future不能很好的组合带条件的异步执行流程,或许可以用Future.get(),但这样做,异步执行的优势就完全没有了。数据流操作更灵活,Rx的Observable不仅支持处理单独的数据值(就像Future可以做的),也支持数据序列,甚至是无穷的数据流。Observable是一个抽象概念,适用于任何场景。Observable的操作更类似于Iterable,并具有Iterable全部灵活性。可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据可以同步或异步的到达,这种方式更灵活。
但是Rx也存在着一些问题,如不能很好的支持回调,由于响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,对于单层的异步执行来说,回调很容易使用,但对于嵌套的异步组合,它们显得非常笨拙。响应式编程正处于发展初期,相信在开源的力量下Rx的发展会越发完善。
用友软件,7*24小时运维服务!服务监督电话:400-660-0566 临沂用友软件销售、培训、服务、临沂用友二次开发,企事业单位信息化管理服务,OA办公系统,联系电话:18669962876 技术服务QQ:1095460234
|