TOP(About this memo)) > 一覧(Dart) > ストリーム
Asynchronous programming in Dart is characterized by the Future and Stream classes.
A stream is a sequence of asynchronous events. It is like an asynchronous Iterable—where, instead of getting the next event when you ask for it, the stream tells you that there is an event when it is ready.
Stream<int> countStream(int to) async* {
int k = 0;
while (k < n) yield k++;
}
final stream = Stream.fromIterable([1, 2, 3]);
final subscription = stream.listen((n) => print(n));
final stream1 = Stream<int>.periodic(const Duration(seconds: 1), (i) {
return i;
}).take(5);
Stream.value(1).listen(print);
expect(Stream.error(Exception()), emitsError(isException));
Iterable<int> naturalsTo(int n) sync* {
int k = 0;
while (k < n) yield k++;
}
Iterable<int> r(int n) sync* {
if (n > 0) {
yield n;
yield* r(n - 1);
}
}
void main() {
Future(() => print(5));
countStream(1, 3).listen(print);
countStream(6, 8).listen(print);
Future.microtask(() => print(4));
}
Stream<int> countStream(int from, int to) async* {
for (int i = from; i <= to; i++) {
//if (i > 2) await Future((){});
yield i;
}
}
/*
4
1
6
2
7
3
8
5
*/
Future<void> main() async {
await for (final value in Stream.fromIterable([1, 2, 3])) {
print(value);
}
print("main end");
}
// 1
// 2
// 3
// main end
void main() {
countStream(6).listen(print, onError:print);
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i%3 == 0) {
yield* Stream.error("error");
} else {
yield i;
}
}
}
// 1
// 2
// error
// 4
// 5
// error
void main() {
countStream(6).listen(print, onError:print);
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i%3 == 0) {
throw Exception("error");
} else {
yield i;
}
}
}
// 1
// 2
// Exception: error
void main() {
Stream<int>.periodic(const Duration(milliseconds: 1), (i) {
if (i == 3) {
throw "error";
}
return i;
}).take(5).listen(print, onError: print);
}
// 0
// 1
// 2
// error
// 4
// 5
try {
await for (final value in countStream(6)) {
print(value);
}
} catch (e) {
print(e);
}
// 1
// 2
// Exception: error
await for (final value in countStream(6)) {
try {
print(value);
} catch (e) {
print(e);
}
}
// 1
// 2
// Uncaught Error: Exception: error
void main() async{
await for (final value in countStream(6).handleError(print)) {
print(value);
}
}
// 1
// 2
// Exception: error
void main() {
stream2().listen(print);
}
final stream1 = Stream.fromIterable([1, 2, 3]);
Stream<int> stream2() async* {
yield 100;
yield* stream1;
await for (final n in stream1) {
yield n;
}
// 下記のようには書けない。
// stream1.listen((n){yield n;}); // エラー
}
// 100
// 1
// 2
// 3
// 1
// 2
// 3
void main() {
stream2().listen(print, onError:print);
}
Stream<int> countStream(int to) async* {
//...
}
Stream<int> stream2() async* {
await for (final n in countStream(5)) {
yield n;
}
}
// 1
// 2
// error
void main() {
stream2(print).listen(print);
}
Stream<int> countStream(int to) async* {
//...
}
Stream<int> stream2(Function onError) async* {
await for (final n in countStream(5).handleError(onError)) {
yield n;
}
}
main() {
final sbsc = stream.listen(print);
sbsc.pause();
sbsc.resume();
sbsc.cancel();// サブスクの破棄。
//stream.listen((event) {});//エラーとなる。他のサブスクがキャンセルをしても、別のサブスクを作成することができない。
}
final stream = countStream(3);
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
main() {
final stream = Stream.fromIterable([1,2,3]);
stream.listen(print).cancel();
stream.listen(print);//
}
main() async{
stream.listen(print);
stream.listen(print);
await Future(()=>print(4));
// Future実行後のlistenの時点で既に送信が完了しているため、受信は無い。
stream.listen(print);
}
final stream = countStream(3).asBroadcastStream();
// 1
// 1
// 2
// 2
// 3
// 3
// 4
main() {
final sbsc = countStream(5).asBroadcastStream().listen((e)=>print("$e at listener"));
sbsc.cancel();
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
print("$i at generator");
yield i;
}
}
// 1 at generator
// 2 at generator
// 3 at generator
// 4 at generator
// 5 at generator
import 'dart:async';
main() async{
sc1.stream.listen(print);
sc1.add(3);
sc1.add(4);
sc1.sink.add(5);
}
final StreamController<int> sc1 = StreamController<int>();
// 3
// 4
// 5