docs

TOP(About this memo)) > 一覧(Dart) > ストリーム

Stream

Asynchronous programming in Dart is characterized by the Future and Stream classes.

A stream is a sequence of asynchronous events. It is like an asynchronous Iterable—where, instead of getting the next event when you ask for it, the stream tells you that there is an event when it is ready.

Streamの種類

Streamの作成

Stream<int> countStream(int to) async* {
  int k = 0;
  while (k < n) yield k++;
}

(参考)Iterableの作成

Iterable<int> naturalsTo(int n) sync* {
  int k = 0;
  while (k < n) yield k++;
}

yield*

Iterable<int> r(int n) sync* {
  if (n > 0) {
    yield n;
    yield* r(n - 1);
  }
}

Streamから値を受け取る

listen

void main() {
  Future(() => print(5));
  countStream(1, 3).listen(print);
  countStream(6, 8).listen(print);
  Future.microtask(() => print(4));
}

Stream<int> countStream(int from, int to) async* {
  for (int i = from; i <= to; i++) {
    //if (i > 2) await Future((){});
    yield i;
  }
}

/* 
4
1
6
2
7
3
8
5
*/

await for

Future<void> main() async {
  await for (final value in Stream.fromIterable([1, 2, 3])) {
    print(value);
  }
  print("main end");
}
// 1
// 2
// 3
// main end

Streamのエラー

Stream.error

void main() {
  countStream(6).listen(print, onError:print);
}
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i%3 == 0) {
      yield* Stream.error("error");
    } else {
      yield i;
    }
  }
}
// 1
// 2
// error
// 4
// 5
// error

Exception

void main() {
  countStream(6).listen(print, onError:print);
}
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i%3 == 0) {
      throw Exception("error");
    } else {
      yield i;
    }
  }
}
// 1
// 2
// Exception: error
void main() {
  Stream<int>.periodic(const Duration(milliseconds: 1), (i) {
    if (i == 3) {
      throw "error";
    }
    return i;
  }).take(5).listen(print, onError: print);
}
// 0
// 1
// 2
// error
// 4
// 5

await for

Streamから他のStreamの値を使う

void main() {
  stream2().listen(print);
}
final stream1 = Stream.fromIterable([1, 2, 3]);

Stream<int> stream2() async* {
  yield 100;
  yield* stream1;
  await for (final n in stream1) {
    yield n;
  }

  // 下記のようには書けない。
  // stream1.listen((n){yield n;}); // エラー
}

// 100
// 1
// 2
// 3
// 1
// 2
// 3

(参考) エラー後も引き続き値を受け取る

Single/Broadcast

(参考)Streamの破棄

リスナー側

ジェネレータ側

StreamController

import 'dart:async';

main() async{
  sc1.stream.listen(print);
  sc1.add(3);
  sc1.add(4);
  sc1.sink.add(5);
}
final StreamController<int> sc1 = StreamController<int>();
// 3
// 4
// 5

sink.addとaddの違い