Kotlin 协程五
wptr33 2024-12-14 15:32 38 浏览
上一节主要介绍了协程的同步、异步、上下文、调度。通过前面的介绍,其实主要都是基于挂起函数,然后挂起函数是异步返回单个值,那如何返回多个异步计算的值呢?这就是本节主要要讲述的Kotlin Flows。
Flow之前
在介绍Flow之前,我们看一下还可以通过哪些方式来产生多个值?
集合(collections)
在Kotlin中可以使用集合来表示多个值。例如,我们可以有一个简单的函数,返回一个包含三个数字的列表,然后使用forEach将它们全部打印出来:
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
执行结果:
1
2
3
系列(Sequences)
使用阻塞代码来计算这些数字(每次计算需要100毫秒),可以使用一个序列来表示这些数字:
fun simple(): Sequence<Int> = sequence {
for (i in 1..3) {
Thread.sleep(100)
yield(i)
}
}
fun main() {
simple().forEach { value -> println(value) }
}
执行结果:
1
2
3
挂起方法
通过上面这种计算会阻塞正在运行代码的主线程。当这些值由异步代码计算时,我们还可以用挂起修饰符标记简单函数,这样它就可以在不阻塞的情况下执行工作,并以列表的形式返回结果:
suspend fun simple(): List<Int> {
delay(100)
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
执行结果:
1
2
3
Flows
使用List<Int>结果类型,意味着我们只能一次返回所有值。为了表示正在异步计算的值流,我们可以使用Flow<Int>类型,就像上面使用Sequence<Int>类型用于同步计算值一样:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
simple().collect { value -> println(value) }
}
执行结果
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
来看一下Flow代码有哪些不同之处:
- Flow类型的构建器函数称为flow;
- flow{…}中的代码可以挂起;
- simple函数不再用suspend修饰符标记;
- 使用emit函数从流发射值;
- 使用collect函数从流中收集值。
冷流
Flow流是类似于序列的冷流——流构建器中的代码直到流被收集后才运行。我们可以通过下面的例子更清晰的看出来:
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
执行结果:
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
这就是simple函数(返回流)没有使用挂起修饰符标记的关键原因。simple()调用本身快速返回,不等待任何东西。每次收集时,流都会重新开始,这就是为什么每次调用collect时我们都会看到“流已启动”的原因。
构建Flow
通过flow{…} 来构建Flow是最基本的一种方式。接下来,我们看看还可以通过哪些方式可以构建Flow:
- flowOf构建器定义了一个流,它发出一组固定的值。
- 可以使用. asflow()扩展函数将各种集合和序列转换为流。
(1..3).asFlow().collect { value -> println(value) }
Flow操作
Flow流可以使用操作符进行转换,就像转换集合和序列一样。中间操作符应用于上游流并返回下游流。这些操作符也是冷操作,就像Flow一样。意味着对这样一个操作符的调用本身并不是一个挂起函数,它会快速执行并返回一个新的转换流。
基本操作符,如map和filter。这些操作符与序列的一个重要区别是,这些操作符中的代码块可以调用挂起函数。
map
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
执行结果:
response 1
response 2
response 3
transform
它可以用来模仿简单的转换,如map和filter,以及实现更复杂的转换。使用转换操作符,我们可以发射任意次数的任意值。
例如下面这段代码,使用transform我们可以在执行一个长时间运行的异步请求之前发出一个字符串,并在它后面跟着一个响应:
fun main() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
执行结果
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
take
take操作符会在达到相应限制时取消流的执行。协程中的取消总是通过抛出异常来执行,这样所有的资源管理函数(比如try{…}finally{…} )在取消的情况下正常运行:
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
执行结果:
1
2
Finally in numbers
通过结果可以看出,只取出了前两个值,然后就取消了协程。
Flow是按顺序执行的
Flow流执行规则如下:
- 流的每个单独收集都是按顺序执行的,除非使用了操作多个流的特殊操作符。
- 集合直接在调用终止操作符的协程中工作。
- 默认情况下不会启动新的协程。
- 每个发出的值都由上游到下游的所有中间操作符处理,然后传递给终端操作符。
通过下面的例子,过滤偶数并将它们映射到字符串:
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
执行结果:
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
通过上面的结果,我们可以更直白得看出流的顺序执行规则。
Flow上下文
Flow流的collect操作总是发生在调用协程的上下文中。例如,存在一个simple流,并在指定的上下文中运行,而不管simple流的实现细节:
withContext(context) {
simple().collect { value ->
println(value)
}
}
默认情况下,flow{…}构建器运行在相应流的收集器提供的上下文中。例如:
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
执行结果:
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect是从主线程调用,简单的流体也在主线程中调用。
注意:使用withContext的一个陷阱
我们开发过程,对于长时间运行且消耗cpu代码,可能需要在Dispatchers.Default上下文中执行,然后ui更新代码可能需要在Dispatchers.Main上下文中执行。通常Kotlin协程使用withContext更改代码中的上下文,但是flow{…}构建器必须遵守上下文一致性,不允许从不同的上下文emit。
fun simple(): Flow<Int> = flow {
// 试图通过Dispatchers.Default改变上下文,以执行耗时操作
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100)
emit(i)
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
执行结果:
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn
上面的异常是指出必须使用flowOn函数来改变流发射上下文。下面的例子显示了更改流上下文的正确方法,它还打印了相应线程的名称,以显示它是如何工作的:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100)
log("Emitting $i")
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
执行结果:
[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
通过执行结果可以看出,flow{…}在后台线程中工作,而在主线程中进行收集。
这里要注意,flowOn操作符改变了流的默认顺序性质。现在收集发生在一个协程(“coroutine#1”)中,而发射发生在另一个协程(“coroutine#2”)中,该协程与收集的协程同时运行在另一个线程中。当上游流必须在其上下文中更改CoroutineDispatcher时,flowOn操作符为其创建一个新的协程。
总结
如果我们之前已经熟悉响应式流或响应式框架(如RxJava和Reactor项目)的人来说,Flow的设计可能看起来非常熟悉。事实上,它的设计灵感来自于Reactive Streams及其各种实现。但是Flow的主要目标是拥有尽可能简单的设计,对Kotlin和suspend友好,并使用结构化并发。
Flow是一个响应式流,可以将其转换为响应式,反之亦然。由kotlinx.coroutines提供转换器:
- kotlinx-coroutines-reactive for Reactive Streams,
- kotlinx-coroutines-reactor for Reactor
- kotlinx-coroutines-rx2/kotlinx-coroutines-rx3 for RxJava2/RxJava3
相关推荐
- MySQL进阶五之自动读写分离mysql-proxy
-
自动读写分离目前,大量现网用户的业务场景中存在读多写少、业务负载无法预测等情况,在有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至会对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压...
- 3分钟短文 | Laravel SQL筛选两个日期之间的记录,怎么写?
-
引言今天说一个细分的需求,在模型中,或者使用laravel提供的EloquentORM功能,构造查询语句时,返回位于两个指定的日期之间的条目。应该怎么写?本文通过几个例子,为大家梳理一下。学习时...
- 一文由浅入深带你完全掌握MySQL的锁机制原理与应用
-
本文将跟大家聊聊InnoDB的锁。本文比较长,包括一条SQL是如何加锁的,一些加锁规则、如何分析和解决死锁问题等内容,建议耐心读完,肯定对大家有帮助的。为什么需要加锁呢?...
- 验证Mysql中联合索引的最左匹配原则
-
后端面试中一定是必问mysql的,在以往的面试中好几个面试官都反馈我Mysql基础不行,今天来着重复习一下自己的弱点知识。在Mysql调优中索引优化又是非常重要的方法,不管公司的大小只要后端项目中用到...
- MySQL索引解析(联合索引/最左前缀/覆盖索引/索引下推)
-
目录1.索引基础...
- 你会看 MySQL 的执行计划(EXPLAIN)吗?
-
SQL执行太慢怎么办?我们通常会使用EXPLAIN命令来查看SQL的执行计划,然后根据执行计划找出问题所在并进行优化。用法简介...
- MySQL 从入门到精通(四)之索引结构
-
索引概述索引(index),是帮助MySQL高效获取数据的数据结构(有序),在数据之外,数据库系统还维护者满足特定查询算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构...
- mysql总结——面试中最常问到的知识点
-
mysql作为开源数据库中的榜一大哥,一直是面试官们考察的重中之重。今天,我们来总结一下mysql的知识点,供大家复习参照,看完这些知识点,再加上一些边角细节,基本上能够应付大多mysql相关面试了(...
- mysql总结——面试中最常问到的知识点(2)
-
首先我们回顾一下上篇内容,主要复习了索引,事务,锁,以及SQL优化的工具。本篇文章接着写后面的内容。性能优化索引优化,SQL中索引的相关优化主要有以下几个方面:最好是全匹配。如果是联合索引的话,遵循最...
- MySQL基础全知全解!超详细无废话!轻松上手~
-
本期内容提醒:全篇2300+字,篇幅较长,可搭配饭菜一同“食”用,全篇无废话(除了这句),干货满满,可收藏供后期反复观看。注:MySQL中语法不区分大小写,本篇中...
- 深入剖析 MySQL 中的锁机制原理_mysql 锁详解
-
在互联网软件开发领域,MySQL作为一款广泛应用的关系型数据库管理系统,其锁机制在保障数据一致性和实现并发控制方面扮演着举足轻重的角色。对于互联网软件开发人员而言,深入理解MySQL的锁机制原理...
- Java 与 MySQL 性能优化:MySQL分区表设计与性能优化全解析
-
引言在数据库管理领域,随着数据量的不断增长,如何高效地管理和操作数据成为了一个关键问题。MySQL分区表作为一种有效的数据管理技术,能够将大型表划分为多个更小、更易管理的分区,从而提升数据库的性能和可...
- MySQL基础篇:DQL数据查询操作_mysql 查
-
一、基础查询DQL基础查询语法SELECT字段列表FROM表名列表WHERE条件列表GROUPBY分组字段列表HAVING分组后条件列表ORDERBY排序字段列表LIMIT...
- MySql:索引的基本使用_mysql索引的使用和原理
-
一、索引基础概念1.什么是索引?索引是数据库表的特殊数据结构(通常是B+树),用于...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
程序员的开源月刊《HelloGitHub》第 71 期
-
详细介绍一下Redis的Watch机制,可以利用Watch机制来做什么?
-
假如有100W个用户抢一张票,除了负载均衡办法,怎么支持高并发?
-
Java面试必考问题:什么是乐观锁与悲观锁
-
如何将AI助手接入微信(打开ai手机助手)
-
redission YYDS spring boot redission 使用
-
SparkSQL——DataFrame的创建与使用
-
一文带你了解Redis与Memcached? redis与memcached的区别
-
如何利用Redis进行事务处理呢? 如何利用redis进行事务处理呢英文
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)