インタフェース | 説明 |
---|---|
BaseStream<T,S extends BaseStream<T,S>> |
ストリームの基礎となるインターフェース。
|
Collector<T,A,R> |
入力要素を可変な結果コンテナに累積し、さらに追加可能な処理として、全ての入力要素が処理された後に累積した結果を最終形に変換する可変的簡約処理。
|
DoubleStream |
プリミティブのdoubleである要素の列であり、逐次集計処理および並列集計処理を備える。
|
DoubleStream.Builder |
DoubleStream の可変なビルダ StreamBuilder はライフサイクルを持ち、要素を追加できる構築中段階から始まり、要素を追加できなくなる構築済段階に移行する。 |
IntStream |
プリミティブのintである要素の列であり、逐次集計処理および並列集計処理を備える。
|
IntStream.Builder |
IntStream の可変なビルダ StreamBuilder はライフサイクルを持ち、要素を追加できる構築中段階から始まり、要素を追加できなくなる構築済段階に移行する。 |
LongStream |
プリミティブのlongである要素の列であり、逐次集計処理および並列集計処理を備える。
|
LongStream.Builder |
LongStream の可変なビルダ StreamBuilder はライフサイクルを持ち、要素を追加できる構築中段階から始まり、要素を追加できなくなる構築済段階に移行する。 |
Stream<T> |
逐次集計処理および並列集計処理を備える要素の列。
|
Stream.Builder<T> |
Stream の可変なビルダこれにより要素を個別に生成してStreamBuilder へ追加して(ArrayList を一時バッファとして使った場合の複製のオーバーヘッド無しに)Stream を作成できるようになる。 |
クラス | 説明 |
---|---|
Collectors |
要素をコレクションに累積する、様々な条件によって要素を要約するなど、様々な便利な簡約処理を実装した
Collector の実装。 |
StreamSupport |
ストリームの作成や操作のための低水準ユーティリティメソッド群。
|
列挙型 | 説明 |
---|---|
Collector.Characteristics |
簡約処理の実装の最適化に利用できる、
Collector の性質を表す特性。 |
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
ここではCollection<Widget>
であるwidget
をストリームのデータ源(source)として使い、ストリームに対して絞り込み(filter)・写像(map)・簡約(reduce)を実行して赤いウィジェットの重さの和を計算した(和の計算は簡約(reduction)処理の1例である)。
このパッケージで導入するキーとなるインターフェースはストリームである。クラスStream
, IntStream
, LongStream
, DoubleStream
はオブジェクトおよびプリミティブ型int
, long
, double
のストリームである。ストリームとコレクションにはいくつか異なる点がある。
Stream
に対する絞り込みは、元となるコレクションから要素を削除するのではなく、絞り込んだ値からのみなる新しいStream
を生成する。String
を探す」際には全ての入力文字列を調べなくてもよいストリームに対する処理は中間処理(intermediate operation, Stream
を生成する)と末端処理(terminal operation, 値か副作用生成する)に分けられる。中間処理は必ず遅延的である。limit(n)
やfindFirst()
といった短絡的処理を使うと無限のストリームに対する計算が有限の時間で終わるようにできる。Iterator
と同じように、新しいストリームを生成する必要がある。 Collection
からstream()
メソッドおよびparallelStream()
メソッドを利用して入手する。Collection
からstream()
メソッドおよびparallelStream()
メソッドを利用して入手する。Stream.of(Object[])
, IntStream.range(int, int)
, Stream.iterate(Object, UnaryOperator)
といった静的なファクトリメソッドを利用して入手する。BufferedReader.lines()
で得られる。Files
の各メソッドから得られる。Random.ints()
から得られる。BitSet.stream()
, Pattern.splitAsStream(java.lang.CharSequence)
, JarFile.stream()
など、JDKにあるストリーム関連の多数のメソッドから入手する。その他にもストリームの情報源がサードパーティのライブラリからこれらの手法を使って用意されるだろう。
ストリーム処理は中間(intermediate)処理と末端(terminal)処理に分類でき、それらは組み合わされてパイプラインを形成する。ストリームパイプラインは複数の構成要素からなり、データ源(source, Collection
・配列・ジェネレータ関数・IOチャネルなど)の後にStream.filter
やStream.map
といった零個以上の中間処理とStream.forEach
やStream.reduce
といった末端処理が続く。
中間処理は新しいストリームを返す。それらは常に遅延的(lazy)である。Stream.filter
のような中間処理を実行しても実際の絞り込みは実行されず、代わりに最初のStream
の要素のうち与えられたPredicate
にマッチする要素を(走査した際に)含む新しいStream
を作成する。パイプラインの末端処理が実行されるまでパイプラインのデータ源の走査は始まらない。
末端処理はストリームを走査し、成果物を生成するか、副作用を発生させる。末端処理を実行した後はそのストリームは消費されたものとみなされ、その後は利用できなくなる。もし同じ情報源を再び走査したい場合は、元のデータ源から新しいストリームを得る必要がある。末端処理はほとんどの場合即時的(eagar)であり、値を返す前に情報源の走査とパイプラインの処理を完了する。末端処理iterator()
とspliterator()
のみ即時的でない。これらは既存の処理が課題に対して十分でない場合にパイプラインの利用者側でコントロールできる任意の走査を可能とするための「逃げ道」として用意されている。
ストリームを遅延的に処理すると大幅な効率化が可能となる。上記の絞り込み・写像・合計の例などのパイプラインでは、絞り込み・写像・合計はデータに対する1回のパスに融合でき、中間データも最小にできる。また遅延処理は必要でない場合は全てのデータを処理しなくても良いようにできる。例えば「1000文字以上の最初の文字列を探す」といった処理では、情報源にある全ての文字列を調べる必要はなく、目的の性質を持った文字列を探すのに必要な分だけを調べればよい(この挙動は入力のストリームが単に大きいだけでなく、無限である場合にはより重要となる)。
中間処理はさらに状態を持たない(stateless)ものと状態を持つ(stateful)ものに分けられる。filter
やmap
といった状態を持たない処理は、新しい要素を処理する際に前の要素に起因する状態を保持しない。各要素は他の要素に対する処理とは独立して処理できる。distinct
やsorted
といった状態を持つ処理は、新しい要素を処理する際に前の要素に起因する状態を組み入れる場合がある。
状態を持つ処理は結果を生成する前に全ての入力を処理する必要がある場合がある。たとえばストリームを整列した結果はストリームの全ての要素を見るまで全く生成できない。結果として、並列計算において、状態を持つ中間処理を含むパイプラインは複数のパスで実行される必要がある場合がある。状態を持たない中間処理のみを含むパイプラインは、逐次的にも並列にも最小のバッファリングで1パスで実行できる。
さらに、ある種の処理は短絡的(short-circuiting)処理であると言われる。中間処理は、無限の入力を与えられた際に有限のストリームを結果として生成する場合があるならば短絡的である。末端処理は、無限の入力を与えられた際に有限の時間で終了する場合があるならば短絡的である。パイプラインが短絡的処理を含むというのは無限ストリームに対する処理が有限時間で正常に完了するための必要条件であるが十分条件ではない。
明示的なループによる要素の処理は本質的に逐次的である。ストリームは計算を各要素に対する命令的な処理としてではなく集約処理のパイプラインとして捉え直すことで並列処理を容易にする。全てのストリーム処理は逐次的にも並列にも実行できる。JDKで実装されているStream
は並列性が明示的に要求されない限り逐次ストリームを作成する。例えば、Collection
はCollection.stream()
メソッドとCollection.parallelStream()
メソッドを持ち、それぞれ逐次的ストリームと並列ストリームを生成する。IntStream.range(int, int)
などの他のストリームを生じるメソッドは逐次的ストリームを生成するが、結果に対してparallel()
を呼ぶと効率的に並列化できる。前述の「ウィジェットの重さの合計」を問い合わせる処理を並列にするには次のようにする。
int sumOfWeights = widgets.
parallelStream()
.filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum();
この例における逐次版と並列版の唯一の違いは最初のストリームを「stream()
」ではなく「parallelStream()
」で生成した点である。末端処理が開始した場合、ストリームパイプラインは末端処理が呼び出されたストリームの方針に従って逐次的または並列に実行される。Stream
が逐次的に実行されるか並列に実行されるかはStream#isParallel
によって決定でき、ストリームの方針はsequential()
処理やparallel()
処理によって変更できる。末端処理が開始した場合、ストリームパイプラインは末端処理が呼び出されたストリームのモードに従って逐次的または並列に実行される。
findAny()
のように明示的に非決定的とされない限り、ストリームは逐次的に実行されても並列に実行されても計算結果を変えないはずである。
ほとんどのストリーム処理はユーザが指定した挙動を表すパラメータを受け取る。そのパラメータはラムダ式である場合が多い。挙動の正しさを維持するためには、それらの挙動パラメータ(behavioral parameter)は非干渉的(non-interfering)である必要があり、さらに多くの場合は状態を持たない(stateless)必要がある。このようなパラメータは常にFunction
といった関数的インターフェース(functional interface)のインスタンスであり、λ式やメソッド参照である場合が多い。
ArrayList
などのスレッド安全でないコレクションを含む様々なデータ源に対しても、並列な処理も含めた集計処理をできるようになる。これはストリームパイプラインを実行している間、情報源に対する干渉(interference)を防げる場合のみ可能となる。逃げ道であるiterator()
処理およびspliterator()
処理を除き、パイプラインの実行は末端処理が呼び出された際に開始し、末端処理が完了した時点で終了する。ほとんどの情報源に対しては、干渉を防ぐということはストリームパイプラインの実行中に情報源が全く変更されないのを保障するということとなる。これに対する重要な例外は情報源が並行的(concurrent)な変更を扱うように特別に設計された並行コレクションであるようなストリームである。並行ストリーム源とはそのSpliterator
がCONCURRENT
特性を示すものである。 したがって、情報源が並行的でない可能性があるストリームパイプラインの挙動パラメターはストリームの情報源を決して変更するべきではない。挙動パラメータはストリームの並行でない情報源を変更したり変更させたりするとき、情報源に干渉する(interfere)という非干渉性は並列なパイプラインだけでなく、全てのパイプラインに対して求められる。ストリームの情報源が並行でない限り、ストリームパイプラインの実行中にストリームの情報源を変更すると例外・不正な答え・一般的でない結果を引き起こす。行儀が良いストリーム源の場合、末端処理を開始する前なら情報源を変更でき、それらの変更はストリームが扱う要素として反映される。例えば、以下のコードについて考える。
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
最初に"one"と"two"の2つの要素を持つリストが作成される。そしてストリームがそのリストから作成される。次にリストは3つ目の文字列"three"の追加により変更される。最後にストリームの要素は収集され、共に結合される。リストは末端のcollect
処理が開始する前に変更されたので、結果は文字列"one two three"となる。JDKの全てのコレクションや他のほとんどのJDKクラスから返されるストリームはこの意味で行儀が良い。他のライブラリから生成されるストリームに関して、行儀が良いストリームを構築するための必要条件については低水準ストリーム構築を参照せよ。 map()
へのパラメータである。
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
ここでもしマップ処理が並列に実行されると、同じ入力に対しても、スレッドのスケジューリングの違いにより、実行するごとに異なる結果となる場合がある。一方、状態を持たないラムダ式であれば結果は常に同じとなる。 挙動パラメータから可変的な状態にアクセスしようとするのは安全性と効率の面で悪い選択となることにも注意されたい。もしその状態へのアクセスを同期させなければデータ競合(data race)を起こし、したがってコードは正しくなくなるだろう。しかし状態へのアクセスを同期させれば得ようとしている並列性を損なうロック競合(contention)が発生する。最も良い方法はストリーム処理に対して状態を持つ挙動パラメータを完全に避けるという方法である。ストリームパイプラインにおいて状態を避けるように再編する方法が通常はある。
もし挙動パラメータが副作用を持つと、特に明示されない限り他のスレッドに対する副作用の可視性は保障されないし、同じストリームパイプラインの中の「同じ」要素に対する異なる処理が同じスレッドで実行されるという保障もない。さらにそれらの作用の順番は予想できないものとなる場合がある。パイプラインがストリーム源に対する出現順(encounter order)に対して変わらない結果を生成するように制約されていたとしても(例えばIntStream.range(0,5).parallel().map(x -> x*2).toArray()
は[0, 2, 4, 6, 8]
を生成する必要がある)、各要素に対して写像関数が適用される順番に対する保障は成されないし、与えられた要素に対して挙動パラメータが実行されるスレッドの保障も無い。
副作用を使いたくなるような多くの計算は、可変な累積変数ではなく簡約を利用するなどして、副作用を使わずにより安全で効率的に表現できる。ただし、デバッグ目的でのprintln()
の利用などは通常は無害である。forEach()
やpeek()
といった少数のストリーム処理は副作用のみを処理できる。それらは注意して使う必要がある。
不適切に副作用を使うストリームパイプラインを使わないものに変換する方法の例として、次のコードは文字列のストリームから与えられた正規表現に適合する文字列を探して適合結果をリストに保存する。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
このコードは不必要に副作用を利用している。並列に実行した場合、ArrayList
のスレッド非安全性は間違った結果をもたらし、必要な同期を含めるとロック競合が起きて並列性の利点を損なってしまう。さらに、ここでの副作用の利用は完全に不必要である。forEach()
はより安全でより効率的でより並列化に向いた簡約処理に置き換えられる。
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // 副作用無し!
ストリームは出現順順序(encounter order)を持っている場合と持っていない場合がある。出現順順序の有無は情報源と中間処理によって決まる。ある種のストリーム源(List
や配列など)は本質的に順序付けられている(ordered)が、他のもの(HashSet
など)はそうでない。sorted()
などのある種の中間処理は順序付けられていないストリームに出現順順序を導入する場合がある。一方で別の処理は順序付けられたストリームを順序付けられていない状態にする場合がある(BaseStream.unordered()
など)。さらに、ある種の末端処理は出現順順序を無視する(forEach()
など)。
もしストリームが順序付けられている場合、ほとんどの処理は要素を出現順に処理するよう制約される。もしストリームが[1, 2, 3]
を含むList
である場合、map(x -> x*2)
を実行した結果は[2, 4, 6]
である必要がある。しかし、もしデータ源に出現順順序が定められていなければ、値[2, 4, 6]
の任意の順列はどれも妥当な結果となる。
逐次ストリームに対しては、順序付けは決定性にのみ影響し、効率には影響しない。もしストリームが順序付けられていれば、同じ情報源に対する同じストリームパイプラインの実行は同じ結果をもたらす。もしストリームが順序付けられていなれば、繰り返し実行はは異なる結果をもたらす場合がある。
並列ストリームに対しては、順序の制約を緩めるとより効率的な実行が可能となる場合がある。重複の除去(distinct()
)やグループ化(Collectors.groupingBy()
)などのある種の集約処理は要素の順序が重要でなければより効率的に実装できる。同様に、limit()
などのように本質的に出現順と結び付けられている処理は正しい順序を保障するためにバッファリングが必要となる場合があり、並列性の利点を損なう場合がある。ストリームが出現順順序を持つが、ユーザは特に出現順にこだわらない場合、unordered()
を使って明示的にストリームの順序を取り除くと状態を持つ処理や末端処理において並列性能が向上する場合がある。しかしながら、上記の「ウィジェットの重さの和」の例のようなほとんどのストリームパイプラインにおいては、順序の制約があっても効率的に並列化できる。
reduce()
およびcollect()
という複数の形式の汎用簡約処理を備える。また、sum()
, max()
, count()
という複数の形式の、特定の目的のための簡約処理も備える。 もちろん、このような処理は簡単な逐次的ループとして次のように難なく実装できる。
int sum = 0;
for (int x : numbers) {
sum += x;
}
しかし、上記のような可変な累積処理よりも簡約処理を選ぶ尤もな理由がある。ただ単に「より抽象的」である(個別の要素ではなくストリーム全体に対して処理する)というだけでなく、適切に構築された簡約処理は、要素を処理する関数が結合的で状態を持たない限り、本質的に並列化できる。例えば、渡された数値のストリームの和を求めたい場合、次のように書ける。
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
あるいは次のようにも書ける。
int sum = numbers.stream().reduce(0, Integer::sum);
これらの簡約演算はほとんど変更せずとも安全に並列に実行できる。
int sum = numbers.parallelStream().reduce(0, Integer::sum);
簡約の実装はデータの各部分を並列に処理し、中間結果を統合して最終的な正しい答えを得られるため、簡約は上手く並列化できる(たとえ言語が「並列for-each」文を備えていたとしても、可変な累積を使う方法では共有累積変数sum
の更新に対するスレッド安全性を開発者が確保する必要があり、そこで必要となる同期は、並列化から得られる効率の向上をほとんど失なわせてしまうだろう)。代わりにreduce()
を使えば簡約処理を並列化する際の一切の面倒を除去でき、ライブラリは余分な同期処理を必要とせずに効率的な並列実装を提供できるようになる。
以前示した「ウィジェット」の例では、簡約を他の処理と組み合わせて、ループをバルク処理で置き換える様子を示した。もしwidgets
がgetWeight
メソッドを持つWidget
オブジェクトのコレクションである場合、最も重いブロックを次ようにして求められる。
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
より一般的な形式では、<T>
型の要素に対して<U>
型の結果を産出するreduce
処理は3つのパラメータを必要とする。
<U> U reduce(U identity,
BiFunction<U, ?super T, U> accumulator,
BinaryOperator<U> combiner);
ここで、単位元(identity)は簡約の最初の種となるとなる値であり、要素が無い場合のデフォルトの結果でもある。累積関数(accumulator)は途中結果と次の要素を受けとり、新しい途中結果を生成する。統合関数(combiner)は2つの途中結果を統合して新しい途中結果を生成する(統合関数は並列簡約において必要である。並列簡約では入力が分割され、部分的な累積結果が各部分に対して計算され、そして最終結果を生成するために中間結果が統合される)。 より形式的には値identity
は統合関数の単位元である必要がある。つまり任意のu
に対して、combiner.apply(identity, u)
がu
に等しいということである。加えて、combiner
関数は結合的であり、accumulator
関数と適合する必要がある。つまり任意のu
とt
についてcombiner.apply(u, accumulator.apply(identity, t))
はaccumulator.apply(u, t)
に対してequals()
の意味で等しい必要がある。
この3引数形式は2引数形式の一般化であり、写像ステップを累積ステップに組み込んだものである。単純な重量の合計の例はより一般的な形式を使い次のように再構成できる。
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);
ただし明示的な写像・簡約(map-reduce)構成(訳註: map
メソッドとreduce
メソッドに分けて書く形式)の方がより読みやすく、したがって通常はそちらの方が好ましい。一般的な形式は写像と簡約を1つの関数へ統合すると多くの処理が最適化によって取り除ける場合のために用意されている。 Collection
やStringBuilder
などの可変な結果コンテナに累積していく。 例えば、文字列のストリームを受け取り、1つの長い文字列に連結したいとする。これは次のような通常の簡約でも実現できるだろう。
String concatenated = strings.reduce("", String::concat)
これでも望む結果は得られるし、並列にも動作できる。しかし性能は満足いくものではないだろう。このような実装は大量の文字列コピーを実行し、実行時間は要素数に対してO(n^2)となるだろう。より性能面で良い方法は、StringBuilder
という、文字列を累積していく可変的コンテナに結果を累積していく方法だろう。可変的簡約の並列化には通常の簡約の際と同じ手法が使える。
可変的簡約処理はCollection
のような結果コンテナに望む結果を収集(collect)していくため、collect()
と呼ばれる。collect
処理は3つの関数を必要とする。新しい結果コンテナを構築する供給関数(supplier function)・結果コンテナに入力要素を取り入れる累積関数(accumulator function)・ある結果コンテナの中身を別のコンテナに併合する統合関数(combining function)である。この形式は通常の簡約の一般的な形式とよく似ている。
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ?super T> accumulator,
BiConsumer<R, R> combiner);
reduce()
と同様にcollect
をこのような抽象的な形で表す利点は、並列化を直ちに適用できる点である。つまり累積関数と統合関数が要件を満たす限り、中間結果を並列に累積し、その後統合するようにできる。例えば、ストリーム中の要素の文字列表現をArrayList
に集めたい場合、次のような明確な逐次的for-each形式でも書ける。
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
あるいは、並列化可能な収集形式(collect form)で次のようにも書ける。
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
もしくは、写像処理を累積関数から抜き出すとさらに簡潔に次のように書ける。
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
ここでは供給関数(supplier)はただのArrayListのコンストラクタ
であり、累積関数は文字列化した要素をArrayList
に加えていき、統合関数は単にaddAll
を使ってあるコンテナから別のコンテナに文字列をコピーする。 collect
の3つの側面、つまり供給関数・累積関数・統合関数は密に結合している。Collector
という概念を使うとこの3つの側面を全て捉えられる。文字列をList
に収集する上記の例は標準のCollector
を使って次のように書き換えられる。
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
可変な簡約をCollectorにまとめると他にも利点が得られる。合成可能性である。Collectors
クラスにはコレクタのファクトリがいくつか予め用意されており、コレクタを他のコレクタに変換する結合子もある。例えば、次のように従業員のストリームに対して給与の和を計算するコレクタがあるとする。
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(第2型変数に対する?
は、単にコレクタが使う中間表現について気にしないという意味である)。もし部門ごとの給与の合計の表を作るコレクタを作りたいなら、groupingBy
を利用してsummingSalaries
を再利用できる。
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
通常の簡約処理と同じようにcollect()
処理も適切な条件を満たしたときのみ並列化できる。任意の中間結果に対して、空の結果コンテナを統合した場合等価な結果となる必要がある。つまり、いくつかの累積と統合の結果であるp
に対して、p
はcombiner.apply(p, supplier.get())
と等価である必要がある。
さらに、どのように計算が分割されたとしても等価な結果を生成する必要がある。任意の入力t1
とt2
に対し、下記の計算における結果r1
とr2
は等価である必要がある。
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // 分割しない場合の結果
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // 分割した場合の結果
ここで、等価であるとは一般的にはObject.equals(Object)
によるという意味である。ただし、ある場合には等価性は順序の違いを考慮するように緩めることもある。
Map
を生成するcollect()
のように、複雑な簡約処理について考える。
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
この処理を並列に実行するのは実際には逆効果である。これは統合段階(Map
をキーに従って他のマップに統合する)の処理はある種のMap
の実装では高価となるためである。 しかし、この簡約で使われる結果コンテナがConcurrentHashMap
のように並行的に変更できるコレクションであったとしよう。その場合、並列に複数起動された累積関数は共有する1つの結果コンテナに結果を累積でき、統合関数が個別の結果コンテナを統合せずに済むようにできるだろう。これを並行簡約と呼ぶ。We call this a concurrent reduction.
並行簡約をサポートするCollector
はCollector.Characteristics.CONCURRENT
特性の印が付けられている。しかし、並行コレクションには欠点もある。もし複数のスレッドが結果を並行に共有コンテナに累積していくと、結果が蓄積される順序は非決定的となる。その結果、並行簡約は処理しているストリームに対して順序が重要でない場合のみ可能となる。Stream.collect(Collector)
の実装は次の条件が満たされるときにのみ並行簡約を実施する。
Collector.Characteristics.CONCURRENT
特性を持つ。Collector.Characteristics.UNORDERED
特性を持つ。 BaseStream.unordered()
メソッドを使うとストリームが順序を持たないことを保障できる。例:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(ここでCollectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)
はgroupingBy
の並行版である)。 もし与えられたキーに対する要素が情報源の中で出現する順序が重要である場合、順序は並行的な挿入によって失なわれる特性の1つなので、並行簡約は使えないことに注意されたい。その場合、逐次簡約か統合ベースの並列簡約に制限される。
op
は次の条件を満たすとき結合的である。
(a op b) op c == a op (b op c)
並列評価におけるこの性質の重要性はこれを次の4つの式の場合に拡大するとわかる。
a op b op c op d == (a op b) op (c op d)
つまり(a op b)
を(c op d)
と並列に評価し、その後その結果にop
を適用できる。 結合的な演算の例として、数値の加算・min・max・文字列の連結などがある。
Collection.stream()
やArrays.stream(Object[])
といったメソッドを利用していた。これらのストリーム関係のメソッドはどのように実装されているのだろうか。 StreamSupport
クラスはストリームを作るための低水準なメソッドを備えており、それらは全てなんらかの形のSpliterator
を利用している。スプリッテレータは要素の(無限かもしれない)コレクションを表現する。そして逐次的に進めていく・まとめて走査する・並列に処理できる他のスプリッテレータに入力の一部を分割するといった処理を備える。最も下の水準では、全てのストリームはスプリッテレータを使って駆動する。
それらのほとんどは実装の単純さとそのスプリッテレータを使うストリームの実行時のパフォーマンスとのトレードオフである。スプリッテレータと作るのに最も簡単であるが最も非効率的な方法はイテレータからSpliterators.spliteratorUnknownSize(java.util.Iterator, int)
を使って作る方法である。このようなスプリッテレータは動作するものの、貧弱な並列演算性能を示すだろう。なぜなら大きさに関する情報(元となるデータセットがどの程度大きいのか)を失なってしまうし、単純な分割アルゴリスムしか使えないためである。
より高品質のスプリッテレータは均等でサイズが既知となるような分割処理を備え、大きさに関する情報を累積し、スプリッテレータやデータに関する他のいくつもの特性
を備えて実装が実行を最適化するのに使えるようにするだろう。
可変な情報源に対するスプリッテレータには他の課題がある。データを束縛するタイミングである。スプリッテレータが作成されてからストリームパイプラインが実行されるまでの間にデータが変化する可能性があるからだ。理想的にはストリームのスプリッテレータはIMMUTABLE
特性またはCONCURRENT
特性を示す。そうでなければ遅延束縛(late-binding)するべきである。もしデータ源が推奨されるスプリッテレータを直接用意できない場合は、スプリッテレータをSupplier
と、Supplier
を受けとるstream()
を使って間接的に用意してもよい。スプリッテレータはストリームパイプラインの末端処理が開始した後にのみ提供者から取得される。
これらの条件はストリーム源の変更とストリームパイプラインの実行の間で潜在的な干渉が起きる範囲を大幅に低減する。要求された特性を持つスプリッテレータに基づくストリームや、Supplierに基づくファクトリ形式を使うストリームは、末端処理を開始する前になされたデータ源の変更に影響されない(挙動パラメータがが干渉的でなく、状態を持たないという条件を満たす場合)。詳細は非干渉性を参照せよ。