开发者

Kotlin下Rxjava的基础用法及流式调用示例详解

开发者 https://www.devze.com 2022-12-01 10:32 出处:网络 作者: i听风逝夜
目录前言基础用法fromXXXcreateintervaltimer指定线程observeOnsubscribeOnFlowable流式调用背压前言万事开头难,写文章也是,现在越来越不知道开头...
目录
  • 前言
  • 基础用法
    • fromXXX
    • create
    • interval & timer
    • 指定线程
    • observeOn & subscribeOn
  • Flowable
    • 流式调用
      • 背压

        前言

        万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的还是Android开发者,所以今天来记录下。

        js

        而所谓的响应式编程,就是一种用于应用程序异步编程的技术,他是一个通用的思想,类似与AOP,不只是在java中才有。他专注于对数据的变化做出反应,例如,有一个数据源(这里被称为生产者),一个数据目标(这里被成为消费者),然后在将消费者连接到订阅者之后,响应式编程框架负责将生产者生产的数据推送给消费者,一个可观察对象可以有任意数量的订阅者。

        而对于一些思想上的框架,类似于Spring,源码上大体还是比较难的,毕竟就算是人,在思想上跨越也是有难度的,但对于RxJava来说,源码也不是很多,所以在以后会尝试介绍他的源码实现,而使用Rxjava的好处不是在于实现了什么具体的技术功能,比如使用CGLIB可以实现动态代理的技术,使用JDBC可以进行数据查询,而没有rxjava,我们的代码还可以借助Java8的Stream、CompletableFuture来实现。

        而rxjava的好处在于让代码更简洁、优雅,通过他的链式调用,消除嵌套等。

        在下面的例子中,我们会使用Kotlin来做示范。

        基础用法

        在这里,Observable 字面意思是可观察者,他表示数据源,通常,一旦订阅者开始收听,他们就会开始提供数据,而just表示仅仅,仅仅生产的数据是一个"T",即泛型类型,在这里是String。

        而subscribe表示订阅,当订阅后,他会收到Observable生产的数据,来消费。

        fun main() {
             Observable.just("hello rxjava").subscribe {
                 println(it)
             }
        }
        输出:
        hello rxjava
        

        fromXXX

        而上面说到,just表示仅仅,在rxjava中,不仅仅是具体的数据,还可以是Callable、Array、Future对象等,详细可以看fromXXX等方法,最终的结果由rxjava调用后如Callable的结果后,传递给订阅者。

        fun main() {
            Observable.fromCallable {
                println("callable")
                "hello rxjava"
            }.subscribe {
                println(it)
            }
        }
        

        create

        这个方法给我了我们手动执行的能力,即传递数据到订阅者是我们手动执行的。

        fun main() {
            Observable.create<String> {
                it.onNext("hello")
                it.onError(IllegalArgumentException("错误"))
                it.onComplete()
            }.subscribe ({
                println(it)
            },{
                println(it.message)
            },{
                println("完成")
            })
        }
        

        interval & timer

        还可以通过interval实现固定间隔定时。

        fun main() {
            val observable = Observable.interval(1, TimeUnit.SECONDS)
            observable.subscribe {
                println(it)
            }
            observable.subscribe {
                println(it)
                Thread.sleep(2000)
            }
            Thread.sleep(100000);
        }
        

        而timer方法则是延迟N时间后,发送数据到订阅者.

        fun main() {
            val observable = Observable.timer(2, TimeUnit.SECONDS)
            observable.subscribe {
                println(it)
            }
            observable.subscribe {
                println(it)
                Thread.sleep(2000)
            }
            Thread.sleep(100000);
        }
        

        指定线程

        而使用上面方法有一个好处,即生产者可以在子线程中完成,而实际消费的时候在主线程,这在Android可谓是一种福利,如下。

        fun main() {
            val threadPool = Executors.newCachedThreadPool()
            val anyFuture = threadPool.submit(Callable {
                Thread.sleep(2000)
                "hello"
            })
            Observable.fromFuture(anyFuture).subscribe {
                println(it)
            }
        }
        

        而如果担心等待时间问题,可是使用第二个重载方法,指定一个超时时间,而subscribe还有两个主要参数我们没说,一个是error发生错误时回调,一个是complete完成时回调,但在发生错误后,complete是不会回调的。

        fun main() {
            val threadPool = Executors.newCachedThreadPool()
            val anyFuture = threadPool.submit(Callable {
                Thread.sleep(2000)
                "hello"
            })
            Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({
                println(it)
            },{
                print开发者_C学习ln("错误")
            },{
                println("完成")
            })
        }
        

        observeOn & subscribeOn

        但你以为这就结束了吗,不,rxjava提供了丰富的线程切换,observeOn & subscribeOn这两个方法就是用来指定在哪里运行,Schedulers.newThread()表示在新线程,但rxjava实现的线程中,是守护线程,也就是当主线程退出后,他们也会自动退出,而在下面的例子中,如果在最后不加sleep,会导致主线程退出后,rxjava的所有线程在可能没执行完成后也将退出。

        fun main() {
            Observable.create<String> {
                println(Thread.currentThread().isDaemon)
                it.onNext("hello")
            }
                .observeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.newThread())
                .subscribe {
                    println(Thread.currentThread().name)
                    println(it)
                }
            Thread.sleep(10000)
        }
        

        而如果想自定义线程,也是支持的。

        fun createSchedulers(): Scheduler {
            return Schedulers.from {
                thread { it.run() }
            }
        }
        fun main() {
            Observable.create<String> {
                it.onNext("hello")
            }
                .observeOn(createSchedulers())
                .subscribeOn(Schedulers.newThread())
                .subscribe {
                    println(Thread.currentThread().name)
                    println(it)
                }
        }
        

        Flowable

        Flowable可以看成Observable新的实现,他支持背压,而他的API和Observable相似,在最后会介绍背压。

        流式调用

        我们已经熟悉了Java Stream的好处,所以在这编程里python简单看下rxjava的实现,用法都一样,如下,创建集合"a","b","c","d"

        • map将所有item前添加字符"1"。
        • filter将b结尾的数据过滤掉。
        • skip忽略前n个数据。
        fun main() {
            Flowable.fromIterable(mutableListOf("a","b","c","d"))
                .map { "1${it}" }
                .filter { !it.endsWith("b") }
                .skip(1)
                .subscribe {
                    println(it)
                }
        }
        

        所以最后收到的消息将是 1c、1d

        当然他提供的这类API非常之多,就不介绍了。

        背压

        背压指的是遇到被观察者发送的消息太快,至于它的订阅者不能及时处理数据,而我们可以提供一种告诉被观察者遇到这种情况的策略。

        这种场景有个前提条件,被观察者和订阅者在不同线程。

        背压策略被定义在BackpressureStrategy,有五种。

        MISSING

        通过create方法创建的Flowable没有指定背压策略,不会对通过OnNext发送的数据做缓存或丢弃,需要下游通过背压操作符制定策略。

        ERROR

        如果缓存池数据超限,则抛出异常。

        BUFFER

        可以无限制添加数据。

        DROP

        如果缓存池满了,则丢弃。

        LATEST

        仅保留最新的onNext值,如果下游无法跟上,则覆盖之前的值。

        如下,我们使用BUFFER策略,默认的缓存池大小是128,可以通过System.setProperty("rx3.buffer-size","5")指定,而这个策略会导致只有缓存池不满的情况下,才会生产数据并发送给订阅者。

        fun main() {
            System.setProperty("rx3.buffer-size","5")
            Observable.interval(1,TimeUnit.MILLISECONDS)
                .toFlowable(BackpressureStrategy.BUFFER)
                .map { User(1) }
                .observeOn(Schedulers.newThread())
                .subscribe {
                    Thread.sleep(1000)
                    println("hander $it")
                }
            Thread.sleep(100000)
        }
        

        而如果我们改成DROP,那么最终只有5条数据被消费,其他全部丢弃。

        fun main() {
          js  System.setProperty("rx3.buffer-size","5")
            Observable.range(1,999)
                .toFjslowable(BackpressureStrategy.DROP)
                .map { User(1) }
                .observeOn(Schedulers.newThread())
                .subscribe {
                    Thread.sleep(1000)
                    println("hander $it")
                }
            Thread.sleep(100000)
        }
        

        其他就不做demo了。

        以上就是Kotlin下Rxjava的基础用法及流式调用示例详解的详细内容,更多关于Kotlin Rxjava的资料请关注我们其它相关文章!

        0

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        关注公众号