寫代碼有時候就像坐過山車一樣,當你在有如神助開心搬磚的時候,突然間又手足無措不知道該如何是好。這種情況還循環往復,有時候一天都這樣,有時候整個你的開發生涯都差不多這樣。
尤其在面對Stream的時候這樣的情況更加明顯。Stream的很多概念會讓你覺得很簡單,有些有會讓你抓不到要點,尤其對於Dart或者Flutter的新手的時候。為什麼會這樣的呢?這是因為Strem實在是太過基礎,比如很多感知設備發出來的信號,一些狀態管理工具甚至於Dart的isolate等都是以Stream為基礎。
但是,Stream也沒有難到“蜀道難”的程度,只需要稍加留心和多練習就可以掌握它。
我們先來看一個簡單的例子:
Stream<int> countStream(int to) async* { // 1
for (int i = 1; i <= to; i++) {
await Future.delayed(const Duration(seconds: 1)); // 2
yield i; // 3
}
}
- 使用
async*返回一個流,這個流的類型是Stream<int>。 - 在這裏延遲一秒鐘。
yield在流裏面發出一個值。
在這個簡單的例子裏面製造了一個流,這個流每隔一秒鐘發出一個數值。在StreamBuilder中可以消費這個方法返回的流,具體是這樣的:
// 在StatefulWidget裏
final _countStream = countStream(100); // 1
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("STREAM"),
),
body: StreamBuilder( // 2
stream: _countStream,
builder: (context, snapshot) {
if (snapshot.hasError) { // 3
return Center(
child: Text("error ${snapshot.error}"),
);
} else {
switch (snapshot.connectionState) { // 3
case ConnectionState.none:
return const Center(
child: Text("None"),
);
case ConnectionState.waiting:
return const Center(
child: Text("Waiting..."),
);
case ConnectionState.active:
return Center(
child: Text("Active: ${snapshot.data}"), // 4
);
case ConnectionState.done:
return Center(
child: Text("Active: ${snapshot.data}"),
);
}
}
}));
}
- 在StatefulWidget裏的成員狀態
final _countStream = countStream(100);。 - 使用
StreamBuilder來消費_countStream流。 StreamBuilder的snapshot獲取流執行的狀態。- 使用
snapshot的流數據。
什麼是流(Stream)
什麼是流。就是一個或者多個事件,不斷的發射(到一個管道里)。在(管道)另一端的監聽器可以消費這些事件。在上面的例子中,countStream會不斷的發射事件,StreamBuilder可以消費這些事件。
流和Future類似,也是dart實現異步操作的方式之一。Future可以異步地返回數據,異常之後停止執行。流也類似,可以異步發出一串數據(或者異常)。
流是怎麼工作的
一般來説,流會把數據從管道的一段發到另一端。在這個管道上可以有一個、多個的監聽器訂閲這些數據。這些監聽器根據某些條件對這些數據做一些操作。
具體如何使用流呢?
- 使用已有的流
- 新建流
流,我們有一些瞭解了。具體在Dart使用流也有一些條件。
- 單定流(single-subscription stream)
- 廣播流
新建流
使用已有的流生成流
使用已有的流非常常見。比如,你用了第三方庫、某些基於流的狀態管理工具就會遇到直接生成的流。
這裏沒有第三方庫,直接生成一個流。再用這個流。
import 'dart:async';
void main() {
// 新建一個整數流
final Stream<int> originalStream = Stream<int>.fromIterable([1, 2, 3, 4, 5]); // 1
// 使用map轉化流
final Stream<int> transformedStream = originalStream.map((int value) { // 2
return value * 2; // Double each integer
});
// 訂閲監聽剛剛轉化的流
final StreamSubscription<int> subscription = // 3
transformedStream.listen((int value) {
print('Transformed value: $value');
});
// 取消訂閲
Future.delayed(Duration(seconds: 1), () { // 4
subscription.cancel();
});
}
- 直接生成一個整數流。
- 使用一個已經生成的流,就是剛剛生成的整數流。使用
map轉化整數流,每個流的值翻倍。 - 訂閲轉化的流。
- 取消訂閲。注意,在使用流的時候不用的流需要取消,否則會造成內存泄漏。
使用生成器生成流
這個類型的方法就是countStream所使用的的方法。使用async*表明這個方法返回的是一個流。在方法的內部使用yield向流內部發射值。
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
await Future.delayed(const Duration(seconds: 1));
yield i;
}
}
使用StreamController生成流
相比之下,StreamController比生成器更加好用。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // 向流發射數據
if (counter == maxCount) {
timer?.cancel();
controller.close(); // 關閉流,並通知所有訂閲者
}
}
void startTimer() {
timer = Timer.periodic(interval, tick); // 3
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>( // 1
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream; // 2
}
- 初始化一個
StreamController。在構造這個流控制器的參數都是流控制器的回調。在監聽、暫停、resume和取消的時候需要代碼如何處理具體的問題都可以設置回調處理。 - 返回控制器的流。
- 在訂閲流的時候控制器回調可以在
onListen中開始向流發射數據。注意: 流的數據可以包括具體的業務數據,也可以包含異常。
StreamController提供了創建流的很多便利,但是也有一個問題需要注意。如果StreamController的準備好發射了,沒有訂閲者,那麼這些數據會緩存,從而導致內存泄露。所以,沒訂閲,不發射是流使用的一條黃金規則。
使用流
使用流,就是訂閲流。訂閲了之後就會收到流發射出來的各種數據。
對於Dart使用流的限制條件需要詳細瞭解,否則就算在StreamBuilder這麼簡單是使用環境也會報錯。
Dart默認的流是單定流
單定流(single-subscription stream)在其生命週期中,至允許存在一個訂閲者,或者是監聽器。就算取消了一個已經存在的訂閲者,也不能再訂閲了。如果強行訂閲,報錯:Bad State!
StreamController<int> streamController = StreamController<int>(); // 1
StreamSubscription<int> subscription = streamController.stream.listen( // 2
(int data) {
print('Received data: $data');
},
);
subscription.cancel(); // 3
subscription = streamController.stream.listen( // 4
(int data) {
print('Received data again: $data');
},
);
streamController.close(); // 5
最終會報錯!
- 初始化一個流控制器。
- 訂閲流。
- 取消流的訂閲。
- 再次訂閲流。
- 關閉流。
單點流的這個特點對於數據完整性和順序有要求的需求非常有用。比如解析Http的請求,讀一個文件,或者處理聊天app的消息。
單點流只有在訂閲之後才會開始發出數據,在訂閲者取消點閲之後數據就不會再發出。即使還有數據需要發出。
但是,如果需要多個訂閲者呢?
如果你需要在app的多個部分都訂閲一個流,如果你的app多個部分需要在一個事件發生之後同時做出反應呢?這就需要廣播流了!
廣播流
廣播流可以有多個訂閲者。而且不管有沒有訂閲者,只要準備就緒就會開始不斷的發出數據。廣播流發出的數據沒有順序的要求。訂閲者完全可以在收到數據之後就做出對應的處理。比如,頭條新聞、球賽比分或者天氣預報之類。這樣似乎看起來多少有點浪費資源。所以在使用廣播流的時候要加些小心,否則可能會導致內存泄漏。
所以,在訂閲者收到了done事件之後會好取消訂閲。
import 'dart:async';
void main() {
// 1
StreamController<int> broadcastController = StreamController<int>.broadcast();
// 2
StreamSubscription<int> subscription1 = broadcastController.stream.listen(
(data) {
print('Listener 1 Received: $data');
},
);
// 3
StreamSubscription<int> subscription2 = broadcastController.stream.listen(
(data) {
print('Listener 2 Received: $data');
},
);
// 4
broadcastController.add(1);
broadcastController.add(2);
broadcastController.add(3);
// 5
broadcastController.close();
}
- 使用
StreamBuilder<T>.broadcast()來新建一個廣播流控制器,這個控制器返回的stream就是廣播流了。 - 第二、三步訂閲廣播流。
- 訂閲廣播流。
- 給流添加數據(事件)。
- 關閉流。
在Flutter的實例。在stream_page.dart文件中,給組件添加一個控制器成員。可以控制這個控制器出的初始化方法,用構造函數就是單定流,用broadcast就是廣播流。
給這個流控制器兩個StreamBuilder,讓這兩個builder訂閲流。這樣可以對比單定流和廣播流在多個訂閲下的反應是什麼。
class StreamPage extends StatefulWidget {
StreamPage({super.key});
final controller = StreamController<int>(); // *
@override
State<StatefulWidget> createState() => _StreamPageState();
}
在widget成員里加流控制器。現在這個流控制器使用的是單定流。下面看看單定流在StreamBuilder的使用情況:
不過首先需要在這個按鈕中給這個扣控制器添加數據(事件):
ElevatedButton(
onPressed: () async { // 1
if (mounted) {
setState(() {
_countStream2 = widget.controller.stream; // 2
});
}
for (int i = 1; i <= 10; i++) {
await Future.delayed(const Duration(seconds: 1)); // 3
widget.controller.add(i);
}
},
child: const Text("Start stream")
)
- 需要實現和
async*方法生成流的方式一樣的功能,每隔一秒發射一個數字。所以這裏需要用async。 - 給狀態成員一個流,也可以在
initState裏直接賦值。或者初始化的時候直接賦值。這裏主要説明,在async事件中使用setState的時候需要先判斷mounted屬性。否則會有警告。 - 延遲一秒發射數字。
運行效果,報錯:Bad state。這也説明單定流只能有一個訂閲。這裏的效果可能會受到hot reload的影響。
把final controller = StreamController<int>();換成final controller = StreamController<int>.broadcast();。再次運行代碼,一切正常運行。
不要忘記在dispose方法中關閉流。
@override
void dispose() {
widget.controller.close();
super.dispose();
}
異常處理
就await for來説,流數據會不斷的發射數據,一直到全部的數據都發射完畢。但是,不巧遇到了問題,比如網絡下載的文件突然就斷了之類的。這時候流也會停止。Dart的流也很類似,在遇到第一個錯誤的時候就停止執行。當然也有例外,稍後討論。
import 'dart:async';
void main() {
// 1
StreamController<int> streamController = StreamController<int>();
// 2
StreamSubscription<int> subscription = streamController.stream.listen(
(int data) {
print('Received data: $data');
},
onError: (error) {
print('Error occurred: $error');
},
onDone: () {
print('Stream is done.');
},
);
// 3
streamController.add(1);
streamController.add(2);
throw Exception("Error");
streamController.add(3);
// 5
streamController.close();
}
運行結果只會有1和2兩個數字。
- 初始化一個流控制器。
- 訂閲流。
- 給流添加數據
- 添加一個異常
- 關閉流
在Widget裏執行:
for (int i = 1; i <= 10; i++) {
await Future.delayed(const Duration(seconds: 1));
if (i == 5) {
// widget.controller.addError('Error with num $i');
throw Exception("Number is $i"); // *
} else {
widget.controller.add(i);
}
}
拋出異常的時候,流停止發射值。
如果發生了異常的時候,還想要流繼續執行的話可以這樣:
if (i == 3) yield* () async* { throw Exception(); }();
// 或者: yield* Stream.fromFuture(Future.error(Exception());
// 或者: yield* Stream.error(Exception());
// 或者: controller.addError('錯誤描述'); // 這個是在使用流控制器的時候
總之就是不要讓異常把流擊穿了,而是讓異常變成了流要發射的值的一部分。
最後
瞭解了流的基礎知識之後就要開始基於流的狀態管理了。