Subterranean Flower

JavaScriptのStreams APIで細切れのデータを読み書きする

Author
古都こと
ふよんとえんよぷよぐやま!!!

近年、PCに搭載されるメモリは爆発的に増えました。16GBや32GBのメモリが搭載されているのが当たり前の時代です。性能の制限が強いスマートフォンですら4GBほど搭載していることがあります。ストレージの読み書き速度もどんどん加速し、昔では扱えなかったようなデータ量をリアルタイムで処理できます。インターネット回線も同様に大量のデータを扱えるようになりました。

しかし現実的な大きさのデータを一度に扱おうとすると、現代でもそれなりに処理時間がかかります。ユーザはレスポンスに対して敏感で、反応が0.1秒でも遅れるとストレスを感じます。しかし時間がかかるものはかかるのです。この問題は一見どうしようもないように思えます。

そこで登場するのが「データを細切れにして処理する」というコンピュータにおける万能の薬です。細切れにして逐次処理すれば、少しずつデータを処理することができ、素早いレスポンスを実現することができます。

今まで、この「細切れに処理する」という操作は、JavaScriptが苦手とする分野でした。しかしモダンブラウザに導入されつつあるStreams APIの登場によって、この状況は変わろうとしています。

データを細切れに扱うStreams API

たとえばFetch API(詳しくは「JavaScriptのFetch APIを利用してリクエストを送信する」を参照)で50MBの動画データを読み込むとします。50MBぐらいあると現代の回線でもなかなかに時間がかかります。もしこのときダウンロードが済んだ分だけを逐次処理することができれば、よりスムーズに処理を進められるはずです。

そんな「細切れの処理」を実現するのがStreams APIです。Streams APIを使えば、50MB動画を少しずつ読み込むことだってできます。Streams APIは「データを細切れに読み出す」「データを細切れに変換する」「データを細切れに書き込む」の3つを実現するAPIです。Streams APIはWHATWGのLS(Living Standard)として仕様が策定されています。

実は、Node.jsでは、すでにStreams APIとほぼ同じ仕組みが昔から導入されており、細切れデータを扱うことができました。しかしブラウザにはそのすべがありませんでした。そこで、細切れ処理の仕組みをブラウザにも持ち込もうとしたのがStreams APIです。

3つのストリーム

Streams APIは主に3つのパーツから構成されています。

ひとつは「細切れに読み出す」を担当するReadableStreamです。ReadableStreamは、例えば「動画を少しずつ読み出す」だとか、「WebSocketで飛んでくる細切れデータを読み出す」などを実現できます。一般的には、後述するWritableStreamを組み合わせて使います。

ふたつめは「細切れに変換する」を担当するTransformStreamです。これはReadableStreamから読みだしたデータを変形するストリームとなります。たとえばReadableStreamが画像から1ピクセルずつ読み出し、TransformStreamが1ピクセルずつ変換する……なんてことが実現できます。

みっつめは「細切れに書き込む」を担当するWritableStreamです。これはReadableStreamから読みだしたデータ、あるいはReadableStreamからTransformStreamを通って変換されたデータを書き込むストリームです。「書き込む」といっても、書き込み先はファイルやデータベースだけとは限りません。たとえばコンソールに表示するのだって「書き込む」ですし、DOMに<div>要素として追加するのも「書き込む」です。

この3つのストリームを組み合わせて、細切れ処理を実現します。

あるいはもっとシンプルに、TransformStreamを省略して、ReadableStreamで読みだしてWritableStreamで書き込むといったこともできます。

ブラウザ対応状況

  • ReadableStream: Edge(一部機能)、Chrome
  • TransformStream: Chrome 67
  • WritableStream: Edge(一部機能)、Chrome

この記事で使われているソースコード

以下のリポジトリで公開しています:

https://github.com/subterraneanflowerblog/streams-api

ご自由にお使いください。

細切れにデータを読み出すReadableStream

ReadableStreamは細切れにデータを読み出すストリームです。読み出し元はファイル、なんらかのAPIからの取得データ、あるいはタイマーからランダムに吐き出される数値、なんでもかまいません。

ReadableStreamのもっとも身近な利用例は、Fetch APIでしょう。え?Fetch APIは知ってるけどReadableStreamは知らない?いやいや実はReadableStreamを利用できるんですよ。

実はFetch APIで取得できるresponseのbodyがReadableStreamになってます。これを取得してみましょう。

fetch('verylongdata.txt')
  .then((response) => response.body.getReader()) // ReadableStreamを取得する。

ここでbodyがReadableStreamです。ReadableStreamからはgetReader()メソッドを使用することでリーダーを取得できます。リーダーを取得するとそのストリームにはロックがかかり、他の箇所からのアクセスはできなくなります。

リーダーにはread()という非同期メソッドがあり、read()メソッドはPromiseを返します。このPromiseは{ done, value }として解決されます。(Promiseがわからない人は「Promiseとasync/awaitでJavaScriptの非同期処理をシンプルに記述する」を参照)

doneは読み込みが終了したかどうかのbool値で、読み込み途中の時はfalse、読み込みが完全に終わったらtrueになります。

valueは読み込み中は読み込んだ値に、読み込みが終わった(doneがtrue)ときはundefinedになります。

例えば以下のように読み込みます:

fetch('verylongdata.txt')
  .then((response) => response.body.getReader()) // ReadableStreamを取得する。
  .then((reader) => {
    // ReadableStream.read()はPromiseを返す。
    // Promiseは{ done, value }として解決される。
    reader.read().then(({done, value}) => {
      // データを読み込んだとき:doneはfalse, valueは値。
      // データを読み込み終わったとき:doneはtrue, valueはundefined。
      const decoder = new TextDecoder();
      console.log(decoder.decode(value));
    });
  });

これでデータを読み込めました!しかし細切れのデータしか読み込めず、最初の細切れを読み込んだけで、全てのデータを読み込めたわけではありません。

全ての値を読み出すには、read()メソッドを繰り返し呼び出す必要があります。read()メソッドを呼び出すたびに新しい値が読み込まれます。そして最後まで到達したらdoneがtrueになります。

例えば以下のようにします:

let veryLongText = ''; // 細切れの値をここに結合していく。
const decoder = new TextDecoder();

fetch('verylongdata.txt')
  .then((response) => response.body.getReader()) // ReadableStreamを取得する。
  .then((reader) => {
    // ReadableStream.read()はPromiseを返す。
    // Promiseは{ done, value }として解決される。
    // データを読み込んだとき:doneはfalse, valueは値。
    // データを読み込み終わったとき:doneはtrue, valueはundefined。
    function readChunk({done, value}) {
      if(done) {
        // 読み込みが終わっていれば最終的なテキストを表示する。
        console.log(veryLongText);
        return;
      }

      veryLongText += decoder.decode(value);

      // 次の値を読みにいく。
      reader.read().then(readChunk);
    }
    
    // 最初の値を読み込む。
    reader.read().then(readChunk);
  });

これで最後までどんどん読み出されていきます!

また、コンソールではなくブラウザ上に出力したい場合は、以下のようにすればいいでしょう:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - ReadableStream(Fetch API)</title>
  </head>
  <body>
    <output id="output"></output>
    <script>
      const output = document.body.querySelector('#output');

      let veryLongText = ''; // 細切れの値をここに結合していく。
      const decoder = new TextDecoder();

      fetch('verylongdata.txt')
        .then((response) => response.body.getReader()) // ReadableStreamを取得する。
        .then((reader) => {
          // ReadableStream.read()はPromiseを返す。
          // Promiseは{ done, value }として解決される。
          // データを読み込んだとき:doneはfalse, valueは値。
          // データを読み込み終わったとき:doneはtrue, valueはundefined。
          function readChunk({done, value}) {
            if(done) {
              output.value = veryLongText;
              return;
            }

            veryLongText += decoder.decode(value);

            // 次の値を読みにいく。
            reader.read().then(readChunk);
          }

          // 最初の値を読み込む。
          reader.read().then(readChunk);
        });
    </script>
  </body>
</html>

自分でReadableStreamを作る

Fetch APIには標準でReadableStreamが搭載されていることがわかりました。しかしReadableStreamを自作したい場面もあります。

例えば定期的にランダムな数字を吐き出すReadableStreamを作ることを考えましょう。

ReadableStreamは普通にnewすることができ、適切な引数を与えてやることで自由にカスタマイズできます。

const stream = new ReadStream(underlyingSource, queueingStrategy);

underlyingSourceは、ReadableStreamの読み出し元を定義します。ファイルであるかもしれませんし、タイマーかもしれません。そこはプログラマの自由です。

queueingStrategyは「細切れ」の最大サイズなどを指定します。不要な場合は省略可能です。

さて、あとはこの2つの引数を作ればいいのですが、これらが意外と複雑な構造をしています。

underlyingSource

まずunderlyingSourceですが、いくつかの関数を持ちます。最低限必要なのはstart(controller)という関数で、ここにストリーム開始時の処理を書きます。start関数はcontrollerという引数を受け取り、controller経由で値をenqueue(キューに追加)することで「新しい値を読みだした」とみなされます。

たとえば1秒ごとにランダムな数字を吐き出すReadableStreamのunderlyingSourceは以下のように定義します:

// ReadableStreamのソース(読み込み元)。
// 最低限start関数が必要。
const underlyingSource  = {
  start(controller) {
    // 1秒ごとにランダムな数字を出力する。
    timerId = setInterval(() => {
      // 0から100までのランダムな数字を生成。
      const randomNum = Math.round(Math.random() * 100);

      // キューに追加する。
      controller.enqueue(randomNum);
    }, 1000);
  }
}

次にこれを使ってReadableStreamを作ります:

// さっきのunderlyingSourceを使ってReadableStreamを作る。
const stream = new ReadableStream(underlyingSource);

あとはこのストリームからgetReader()でリーダーを取得(&自動ロック)して、読み出すだけです:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - ReadableStream(Custom)</title>
  </head>
  <body>
    <!-- 表示する場所。 -->
    <output id="output"></output>

    <!-- スクリプト。 -->
    <script>
      const output = document.body.querySelector('#output');

      let timerId = null;

      // ReadableStreamのソース(読み込み元)。
      // 最低限start関数が必要。
      const underlyingSource  = {
        start(controller) {
          // 1秒ごとにランダムな数字を出力する。
          timerId = setInterval(() => {
            // 0から100までのランダムな数字を生成。
            const randomNum = Math.round(Math.random() * 100);

            // キューに追加する。
            controller.enqueue(randomNum);
          }, 1000);
        }
      }

      // さっきのunderlyingSourceを使ってReadableStreamを作る。
      const stream = new ReadableStream(underlyingSource);

      // 作成したReadableStreamからリーダーを取得する。
      // リーダーを取得するとストリームは自動的にロックされる。
      const reader = stream.getReader();

      // ストリームから順次読み出す関数。
      const readChunk = ({done, value}) => {
        if(done) { return; }
        output.value = value;

        // 次の値を読む。
        reader.read().then(readChunk);
      }

      // 値を読み始める。
      reader.read().then(readChunk);
    </script>
  </body>
</html>

start関数が非同期的である場合、start関数の中でPromiseをreturnすれば非同期的なstartであることを表すことができます。

underlyingSourceには他にも様々な関数が設定できます。例えばストリームをキャンセル(stream.cancel()など)したときに後処理をするcancel(reason)関数を実装することもできます:

const underlyingSource  = {
  start(controller) {
    // 1秒ごとにランダムな数字を出力する。
    timerId = setInterval(() => {
      // 0から100までのランダムな数字を生成。
      const randomNum = Math.round(Math.random() * 100);

      // キューに追加する。
      controller.enqueue(randomNum);
    }, 1000);
  },

  // ストリームをキャンセルしたときなどに呼び出される関数。
  cancel(reason) {
    // タイマーを消す。
    clearInterval(timerId);
  }
}

他にもpull(controller)関数というものもあります。これはReadableStreamの内部キューに空きがあるときに、空きがなくなるまで繰り返し呼び出される関数です。もしpullを非同期処理にしたいなら、pull関数の中でPromiseをreturnすることで、次のpullをPromiseの解決まで待たせることができます。

controllerにはenqueueメソッドだけでなく、ストリームを閉じるclose()メソッドと、エラーを発生させるerror(e)メソッドがあります。closeメソッドはストリームを閉じますが、内部キューのデータを全て処理してから閉じます。内部キューのデータを破棄してすぐさま終了したいときはReadableStreamのcancel()メソッドを使用しましょう。

また、type: ‘bytes’を設定すれば、controllerを通してバイト列としての操作ができるようになります(※現在対応ブラウザなし)。このときautoAllocateChunkSizeという値を一緒に設定しておくと、あらかじめ指定したサイズのArrayBufferを確保してくれます。

const underlyingSource  = {
  start(controller) {
    // 何か処理。
  },
  type: 'bytes',
  autoAllocateChunkSize: 100
}

queueingStrategy

start関数やpull関数についての説明で、「キュー」という言葉が何度か出てきました。ReadableStreamは内部にキューを持っていて、データは一度そこを通ります。

このとき、例えばデータの読み出しよりも書き込みの方が遅かったとしましょう。データ10個を読み込む間に書き込みはデータ1個しか終わっていなかったら、データが9個も余ってしまいます。書き込みは終わらないのに読み出しキューにはどんどん値が溜まっていき、最終的に膨大なデータが溜め込まれてしまいます。

この問題を防ぐための、キューへの読み出し制限をかけるのがqueueingStrategyです。queueingStrategyはReadableStreamを作るときの第二引数に指定します。

queueingStrategyは自分で作ることもできますし、すでに用意されているものを使うこともできます。たとえばあらかじめ用意されているCountQueueingStrategyを使えば、キューへ蓄積する個数を制限することができます。

CountQueueingStrategyを使用した例は以下のようになります:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - ReadableStream(queueingStrategy)</title>
  </head>
  <body>
    <!-- 表示する場所。 -->
    <output id="output"></output>

    <!-- スクリプト。 -->
    <script>
      const output = document.body.querySelector('#output');

      const underlyingSource  = {
        start(controller) {
          // 特に何もしない。
        },
        pull(controller) {
          // pull関数はキューに空きがある限り繰り返し実行され続ける。
          // キューに空き容量がある限り、ランダムな数字を追加する。
          const randomNum = Math.round(Math.random() * 100);
          controller.enqueue(randomNum);
        }
      }

      // あらかじめ用意されているストラテジを使用する。
      // キューの上限を5個にする。
      const queueingStrategy = new CountQueuingStrategy({ highWaterMark: 5 });

      // 上のストラテジを使用するReadableStreamを作成する。
      const stream = new ReadableStream(underlyingSource, queueingStrategy);
      const reader = stream.getReader();

      const readChunk = ({done, value}) => {
        if(done) { return; }
        output.value = value;

        // 次は1秒後に読み込む。
        setTimeout(() => reader.read().then(readChunk), 1000);
      }

      reader.read().then(readChunk);
    </script>
  </body>
</html>

この例では、キューに入るデータを5個までに制限し、pull関数を使ってキューに空きがある限りランダムな数字を追加しています。一方で読み出しは1秒に1度にしています。

なお、queueingStrategyを指定しなかった場合は、自動的にhighWaterMarkが1のCountQueuingStrategyが適用されます。

他にはByteLengthQueuingStrategyというものもあります。こちらはデータのバイト数で制限をかける方式です。

const queuingStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 });

queueingStrategyは自作することもできます。そのときはhighWaterMarkと、size(chunk)関数を実装します。

const strategy = {
  highWaterMark: 500,
  size(chunk) {
    return chunk.length;
  }
}

size(chunk)関数はchunk(細切れデータ)のサイズをどう決めるかを示す関数です。たとえば常に1を返せば、1chunkあたりサイズ1となり、これはCountQueueingStrategyと同じになります。そしてchunkのバイトサイズを返すようにすれば、ByteLengthQueueingStrategyと同じになります。

ReadableStreamを2つに分割する

ReadableStreamからリーダーを取得するとロックがかかって、読み出しが終わってロックが解除されるまでもうひとつのリーダーを取得することはできなくなります。ですが、同じReadableStreamから2つの出力をしたいときもあります。

そういうときはReadableSteeamのtee()メソッドを使用すると、ストリームを2つに分割することができます。

例えば出力をoutput要素とコンソールに分けるには以下のようにします:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - ReadableStream(Tee)</title>
  </head>
  <body>
    <!-- 表示する場所。 -->
    <output id="output"></output>

    <!-- スクリプト。 -->
    <script>
      const output = document.body.querySelector('#output');

      const underlyingSource  = {
        start(controller) {
          setInterval(() => {
            const randomNum = Math.round(Math.random() * 100);
            controller.enqueue(randomNum);
          }, 1000);
        },
      }

      const stream = new ReadableStream(underlyingSource);

      // teeメソッドでストリームを2つに分割する。
      const [stream1, stream2] = stream.tee();
      const reader1 = stream1.getReader();
      const reader2 = stream2.getReader();

      // 2つのストリームに対してそれぞれ別の出力をする。
      const readChunkToOutputElement = ({done, value}) => {
        if(done) { return; }
        output.value = value;
        reader1.read().then(readChunkToOutputElement);
      }

      const readChunkToConsole = ({done, value}) => {
        if(done) { return; }
        console.log(value);
        reader2.read().then(readChunkToConsole)
      }

      reader1.read().then(readChunkToOutputElement);
      reader2.read().then(readChunkToConsole);
    </script>
  </body>
</html>

細切れにデータを書き出すWritableStream

次はWritableStreamに触れてみましょう。WritableStreamはReadableStreamとは逆に、細切れにデータを書き込むためのストリームです。書き込み先はファイルでも、コンソールでも、DOMでも、ネット上のAPIでも、どこでもかまいません。

WritableStreamの作り方はReadableStreamとほぼ同じです。以下のようなシグネチャになっています:

const stream = new WritableStream(underlyingSink, queueingStrategy);

ReadableStreamと同じですね。queueingStrategyは省略可能です。省略した場合はhighWaterMarkが1のCountQueueingStrategyが使用されます。

underlyingSinkにはReadableStreamと同様、いくつかの関数を実装します。使用できるのは以下の関数です:

  • start(controller)
  • write(chunk,controller)
  • close(controller)
  • abort(reason)

実装が必須な関数はないので、空のオブジェクトでもかまいません。空オブジェクトにすると何もしないWritableStreamになってしまいますが……。それぞれの関数について、start、write、closeは説明不要でしょう。abortはエラーまたはwriter.abort()などで突然終了したときに呼び出される関数です。非同期処理の場合、それぞれの関数はPromiseを返せば適切に処理してくれるようになっています。

WritableStreamの使い方もReadableStreamとほぼ変わりません。getWriter()でライターを取得してロックし、ライターでwrite(chunk)してデータを書き込むだけです。

たとばoutput要素にデータを表示(書き込む)するWritableStreamは、以下のようにします:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - WritableStream(Simple)</title>
  </head>
  <body>
    <!-- 表示する場所。 -->
    <output id="output"></output>

    <!-- スクリプト。 -->
    <script>
      const output = document.body.querySelector('#output');

      // WritableStreamのシンク(書き出し先)。
      // 実装が必須な関数は無し。つまり空オブジェクトでもOK。
      const underlyingSink = {
        write(chunk, controller) {
          output.value = chunk;
        }
      }

      // underlyingSinkを使ってWritableStreamを作る。
      const stream = new WritableStream(underlyingSink);

      // ライターを取得し、ロックする。
      const writer = stream.getWriter();

      // ライターの準備をreadyで待つ。readyはPromise。
      // 準備ができていたらライターにデータをwriteする。writeもPromiseを返す。
      // 全部のデータを書き込み終わったらcloseする。closeもPromiseを返す。
      writer.ready
        .then(() => writer.write('Hello!'))
        .then(() => console.log('write done.'))
        .then(() => writer.close())
        .then(() => console.log('close writer.'));
    </script>
  </body>
</html>

注意点としては、ライターのreadyプロパティを見て、準備が完了したかを判断してから書き込み始めるというところです。readyプロパティはPromiseになっており、thenでつなぐことができます。

readyが解決したらwriteメソッドやcloseメソッドで操作していきますが、これらもPromiseを返すメソッドであることに注意してください。

ReadableStreamとWritableStreamを繋ぐ

ReadableStreamとWritableStreamはそれら単体で使うことはあまりありません。ReadableStreamから読みだしたデータをWritableStreamで書き出すのが一般的でしょう。

ふたつのストリームをつなぐには、ReadableStreamのpipeTo(writableStream)メソッドを使用します。これだけでReadableStreamから出てきたデータを自動的にWritableStreamが処理してくれます:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - Pipe Two Streams</title>
  </head>
  <body>
    <!-- 表示する場所。 -->
    <output id="output"></output>

    <!-- スクリプト。 -->
    <script>
      const output = document.body.querySelector('#output');

      // 毎秒キューにランダムな数字を追加するReadableStream。
      const underlyingSource = {
        start(controller) {
          setInterval(() => {
            const randomNum = Math.round(Math.random() * 100);
            controller.enqueue(randomNum);
          }, 1000)
        }
      }

      const readableStream = new ReadableStream(underlyingSource);

      // データを読み出しoutput要素に書き込むWritableStream。
      const underlyingSink = {
        write(chunk, controller) {
          output.value = chunk;
        }
      }

      const writableStream = new WritableStream(underlyingSink);

      // ReadableStreamとWritableStreamを接続する。
      readableStream.pipeTo(writableStream);
    </script>
  </body>
</html>

また、オプションを指定することもできます。オプションは3つあります:

const options = {
  preventClose: true,
  preventAbort: true,
  preventCancel: true
}
readableStream.pipeTo(writableStream, options);

preventCloseをtrueに設定すると、ReadableStreamが閉じたときに一緒にWritableStreamが閉じないようにできます。preventAbortはReadableStreamがabortしたときにWritableStreamが連動してabortしないようにできます。preventCancelだけは向きが逆で、WritableStreamでエラーが起きたときにReadableStreamをcancelしないようにする設定です。

細切れに変形するTransformStream

最後にTransformStreamを紹介します。TransformStreamはReadableStreamとWritableStreamの間に挟まり、データの変換を行うストリームです。

使い方はReadableStreamやWritableStreamとほぼ同じで、いくつかの関数と、必要であればqueueingStrategyを渡してやるだけです:

const transformStream = new TransformStream(transformer, writableStrategy, readableStrategy);

ストラテジがwritableとreadableのふたつあるのが違う点ですね。ストラテジを省略すると、writableStrategyはhighWaterMarkが1の、readableStrategyはhighWaterMarkが0のCountQueuingStrategyが適用されます。

TransformStreamに渡す関数は以下の通りです:

  • start(controller)
  • transform(chunk, controller)
  • flush(controller)

それぞれの関数は省略できます。非同期処理になる場合は、関数内でPromiseを返せばうまく処理してくれます。transform関数が変換を行う部分です。受け取ったchunkを加工してcontrollerにenqueueしてやることで加工済みデータをWritableStreamに渡すことができます。

startとflushはデータの一番初めの読み始めと、一番最後の読み終わりに呼び出されます。これらの関数でcontrollerにenqueueしてやることで「一番最初に来るデータ」と「一番最後に来るデータ」を作ることができます。

作成したTransformStreamはReadableStreamのpipeThrough(transformStream)メソッドを使うことで接続できます。ReadableStream -> TransformStream -> WritableStreamの順番で接続します:

readableStream
  .pipeThrough(transformStream)
  .pipeTo(writableStream);

これを使って、たとえば値を100倍にするTransformStreamを作ってみましょう。以下のようになります:

動作デモを開く

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="utf-8">
    <title>Streams API - Transform Stream</title>
  </head>
  <body>
    <!-- 表示する場所。 -->
    <output id="output"></output>

    <!-- スクリプト。 -->
    <script>
      const output = document.body.querySelector('#output');

      // 毎秒キューにランダムな数字を追加するReadableStream。
      const underlyingSource = {
        start(controller) {
          setInterval(() => {
            const randomNum = Math.round(Math.random() * 100);
            controller.enqueue(randomNum);
          }, 1000)
        }
      }

      const readableStream = new ReadableStream(underlyingSource);

      // データを読み出しoutput要素に書き込むWritableStream。
      const underlyingSink = {
        write(chunk, controller) {
          output.value = chunk;
        }
      }

      const writableStream = new WritableStream(underlyingSink);

      // 値を100倍にするTransformStream。
      const transformer = {
        transform(chunk, controller) {
          controller.enqueue(chunk * 100);
        }
      }

      const transformStream = new TransformStream(transformer);

      // ReadableStreamとTransformStreamとWritableStreamを接続する。
      readableStream
        .pipeThrough(transformStream)
        .pipeTo(writableStream);
    </script>
  </body>
</html>

まとめ

Streams APIを使うことで、細切れのデータを扱うことができました。ReadableStreamで少しずつ読み出し、TransformStreamで少しずつ変形し、WritableStreamで少しずつ書き込むという具合にです。

Streams APIはブラウザの対応状況がまだ芳しくなく、実用には遠いですが、強力なAPIであることに変わりはありません。ぜひ今のうちに使い方を習得して起きましょう。