Stream API предоставляет очень богатый функционал по обработке наборов данных в функциональном стиле. Но что если у нас есть некий набор тип данных, для которого нет возможности создать Stream стандартным образом?
Не пишите свой Stream!
Существует буквально куча способов создать поток из своих данных: можно их в коллекцию обернуть, можно как массив представить, можно использовать функцию-генератор и Stream.iterate() и т.д.
Если использовать уже существующие методы никоим образом невозможно, реализуйте для вашего типа данных интерфейс Iterator и создавайте поток вызовом вспомогательной функции Spliterators.spliterator():
StreamSupport.stream(Spliterators.spliterator(it, dataSize,0), false)Если размер данных неизвестен, можно использовать вызов Spliterators.spliteratorForUnknownSize(), который создаст бесконечный Spliterator, из которого будет создан бесконечный Stream.
В любом случае, гораздо предпочтительнее использовать Iterator и обёртки из класса Spliterators, чем реализовывать Spliterator самому. Эти методы имеют встроенную поддержку параллельной обработки, которая более-менее работоспособна и для бесконечных потоков.
Не пишите свой Stream, пишите Spliterator
Хотя нужен вам конечно Stream, писать мы будем реализацию интерфейса Spliterator, который используется внутри потоков и обеспечивает всю потоковую магию. Интерфейс Spliterator похож на Iterator , но обладает двумя важыми отличиями — можно обработать оставшеся элементы одними можно разделить один Spliterator на два.
Я скажу честно: я потратил два дня, пытаясь придумать реалистичный пример, который требовал бы создания своей реализации Spliterator и …. не смог придумать такого примера, который нельзя было бы свести к рецептам, данным выше. Поэтому я использую достаточно надуманный пример, который я подсмотрел в другом месте: класс по работе с многострочным текстом.
Интерфейс Spliterator определяет 8 методов, для четырёх из которых есть реализации по умолчанию. Мы реализуем шесть методов из восьми:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
public class MultilineString {
private final String data[];
public MultilineString(String source) {
data = source.split("\n");
}
Stream<String> stream() {
return StreamSupport.stream(new MultilineSpliterator(), false);
}
Stream<String> parallelStream() {
return StreamSupport.stream(new MultilineSpliterator(), true);
}
public class MultilineSpliterator implements Spliterator<String> {
private int firstPosition, lastPosition;
public MultilineSpliterator() {
firstPosition = 0;
lastPosition = data.length-1;
}
public MultilineSpliterator(int f, int l) {
firstPosition = f;
lastPosition = l;
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
if (firstPosition <= lastPosition) {
firstPosition++;
action.accept(data[firstPosition]);
return true;
}
return false;
}
@Override
public void forEachRemaining(Consumer<? super String> action) {
for (;firstPosition <= lastPosition; firstPosition++) {
action.accept(data[firstPosition]);
}
}
@Override
public Spliterator<String> trySplit() {
int half = (lastPosition - firstPosition)/2;
if (half<=1) {
//Not enough data to split
return null;
}
int f = firstPosition;
int l = firstPosition + half;
firstPosition = firstPosition + half +1;
return new MultilineSpliterator(f, l);
}
@Override
public long estimateSize() {
return lastPosition- firstPosition;
}
@Override
public long getExactSizeIfKnown() {
return estimateSize();
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED | SUBSIZED;
}
}
}
|
Самый первый и самый простой метод, который мы реализуем, это характеристики Spliterator
1
2
3
4
|
@Override
public int characteristics() {
return IMMUTABLE | SIZED | SUBSIZED;
}
|
SIZED и SUBSIZED говорит о том, что мы точно знаем размер набора данных и что после разделения Spliterator мы всё ещё точно будем знать размер. IMMUTABLE говорит о том, что исходные данные не могут быть изменены (добавлены или удалены элементы или изменены). Существуют и другие характеристики:
- ORDERED говорит о том, что порядок элементов в Spliterator важен
- SORTED обычно используется с ORDERED и говорит, что элементы в этом Spliterator отсортированы
- DISTINCT говорит что элементы исходного набора данных уникальны.
- NONNULL гарантирует, что в Spliterator нет null элементов.
1
2
3
4
5
6
7
8
9
|
@Override
public long estimateSize() {
return lastPosition- firstPosition;
}
@Override
public long getExactSizeIfKnown() {
return estimateSize();
}
|
1
2
3
4
5
6
7
8
9
10
|
@Override
public boolean tryAdvance(Consumer<? super String> action) {
if (firstPosition <= lastPosition) {
firstPosition++;
action.accept(data[firstPosition]);
return true;
}
return false;
}
|
1
2
3
4
5
6
|
@Override
public void forEachRemaining(Consumer<? super String> action) {
for (;firstPosition <= lastPosition; firstPosition++) {
action.accept(data[firstPosition]);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Override
public Spliterator<String> trySplit() {
int half = (lastPosition - firstPosition)/2;
if (half<=1) {
//Not enough data to split
return null;
}
int f = firstPosition;
int l = firstPosition + half;
firstPosition = firstPosition + half +1;
return new MultilineSpliterator(f, l);
}
|
- старый Spliterator и новорождённый Spliterator никогда не должны иметь возможности вдвоём обработать какой-либо элемент. Это особенно важно учитывать для граничных элементов.
- estimateSize() старого Spliterator после trySplit() должен возвращать оставшееся после разделения число элементов.
- Операция разделения должна выполняться быстро, желательно чтобы её сложно была O(1) или близкой к тому.
Если Spliterator не удаётся разделить или не хочется разделять или вообще что-то пошло не так, trySplit() может вернуть null как признак того, что разделения не будет, работайте так 🙂
Для использования нового Spliterator добавим вспомогательных методов в класс MultilineString:
1
2
3
4
5
6
7
|
Stream<String> stream() {
return StreamSupport.stream(new MultilineSpliterator(), false);
}
Stream<String> parallelStream() {
return StreamSupport.stream(new MultilineSpliterator(), true);
}
|
И проверим, как они работают:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Test
public void multilineStreamTest() {
MultilineString testedObject = new MultilineString(text);
assertThat(testedObject
.stream()
.count(),
is(28L));
}
@Test
public void parallelStreamTest() {
MultilineString testedObject = new MultilineString(text);
assertThat(testedObject
.parallelStream()
.filter(StringUtils::isBlank)
.count(), is(14L));
}
|
Код примера доступен на github.