Java 24 chegou e com ele chegaram os Gatherers!

Esse artigo foi o que me deu uma visão geral do que são esses Gatherers, espero que a leitura seja proveitosa pra você também.

O que são Gatherers?

De modo geral, são a generalização de operações intermediárias de uma stream. O exemplo dado por Todd Ginsberg é distinctBy, onde se seleciona os elementos de uma stream, mas que apenas um atenda ao parâmetro de distinção:

Stream.of("A", "B", "CC", "DD", "EEE")
      .distinctBy(it -> it.length())   // <-- Not real!
      .toList();
// Gives: ["A", "CC", "EEE"]

O distinctBy não existe no Java, mas com gatherer você pode implementar. Fica algo assim:

Stream.of("A", "B", "CC", "DD", "EEE")
      .gather(distinctBy(String::length))
      .toList();
// Gives: ["A", "CC", "EEE"]

Vamos implementar?

Bem, como queremos algo distinto, isso significa que nosso gatherer precisa ter estado. Por questões de simplicidades, podemos assumir que a função passada para o gatherer vai gerar uma espécie de chave, que precisa ser única.

Bem, vamos adicionar a chave (e o elemento que gerou a chave) no estado intermediário e, após processar tudo, na hora de finalizar, vou coletar os elementos do estado.

Se eu usar LinkedHashMap para o estado, eu coloco cada chave unicamente com putIfAbsent e, na hora de resgatar os valores com state.values() eles ainda estarão em ordem.

Existe também a eventual necessidade de combinar com estados que vem de outras threads, para isso precisamos fazer um combiner, No caso, quem está no estado a esquerda foi adicionado antes, então posso simplesmente pegar tudo novo do estado a direita e jogar no estado a esquerda.

Aqui uma implementação para testar:

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

<A, D> Gatherer<A, Map<D, A>, A> disctincBy(Function<A, D> disctinction) {
	return Gatherer.of(
		LinkedHashMap::new,
		Gatherer.Integrator.ofGreedy( (state, element, downstream) -> {
			state.putIfAbsent(disctinction.apply(element), element);
			return !downstream.isRejecting();
		}),
		(stateLeft, stateRight) -> {
			stateRight.forEach((k, v) -> stateLeft.putIfAbsent(k, v));
			return stateLeft;
		},
		(state, downstream) -> {
			state.values().forEach(downstream::push);
		}
	);
}

final var l = Stream.of("A", "B", "CC", "DD", "EEE")
        .gather(disctincBy(String::length))
        .toList();
System.out.println(l);

Leia mais na JavaDoc de Gatherer

O desafio

Já que gatherers são a generalização das operações intermediárias, isso significa que eu posso trabalhar unicamente com gatherers. Então, vamos brincar de trocar todas as operações intermediárias para gatherers?

Basicamente, onde antes tínhamos algo assim:

Stream.of("A", "B", "CC", "DD", "EEE")
    .map(String::length)
    .toList();

Iremos substituir pelo gatherer map mantendo a mesma API da função intermediária .map:

Stream.of("A", "B", "CC", "DD", "EEE")
    .gather(map(String::length))
    .toList();

Para consultar as operações intermediárias, consultei a javadoc para java.util.stream.Stream e peguei todos os métodos de instância que retornavam outras streams. Mesmo métodos com implementações default vão aqui, apesar de ter implementação default por que não, né?

Para ilustrar as operações intermediárias, vou colocar contra esse conjunto de elementos, por os parâmetros adequados e mostrar a saída esperada:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(...)
    .toList();
// Gives [...]

E agora, com vocês, as operações intermediárias de streams do java, mas agora implementadas com gatherer!

Memória muscular

Bem, fazendo as operações intermediárias, surgiu um pouco de memória muscular na hora de implementar o próximo gatherer. A anatomia da interface é essa:

interface Gatherer<FromUpstream, State, ToDownstream> {...}

Os tipos que chegam no gatherer são o primeiro tipo genérico. O estado (normalmente indicado por ?) é o estado usado pelos gatherers, e tal qual o Collector faz sentido que essa representação intermediária não seja visível para o mundo externo. Assim, ao precisar alterar o estado, não irá quebrar compatibilidade com código existente nem tampouco precisa expor uma classe que só faça sentido dentro de um contexto muito específico.

Falando em estado, se ele não for algum estado baseado obviamente em uma estrutura de dados já presente na sua code base (como, por exemplo, o ArrayList do próprio Java para o intermediário sorted, ou mesmo o LinkedHashSet para distinct), provavelmente o estado é um objeto de uso único local naquela função. Então basicamente, ao precisar de um estado novo, criei a classe dentro do método.

Por exemplo:

<A> Gatherer<A, ?, A> dropWhile(Predicate<? super A> predicate) {
	class State {
		boolean drop = true;
	}

	return Gatherer.ofSequential(State::new,
		Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            if (state.drop) {
                if (!predicate.test(element)) {
                    state.drop = false;
                    return ds.push(element);
                }
                return !ds.isRejecting();
            }
            return ds.push(element);
        })
	);
}

O nome dessa classe não importa, então colocar State acaba sendo um ótimo ponto de partida.

Por fim, o terceiro tipo genérico, que eu representei como ToDownstream, ele representa o tipo que será passado para o downstream, que será consumido posteriormente.

Ao criar o integrator do gatherer, preciso me perguntar uma coisa: o meu integrator vai fazer curto circuito (como por exemplo limit)? Se vai fazer, crio ele com Gatherer.Integrator.of. Caso contrário, o padrão é criar com Gatherer.Integrator.ofGreedy, pois esse gatherer vai tentar consumir tudo até o fim.

Outras coisas para se levar em consideração é se o meu gatherer vai se importar com a ordem com que recebe os argumentos. Por exemplo, o distinctBy acima (que não é operação intermediária) se importa com a ordem, ou então o skip ou o limit. Para esses casos, melhor usar Gatherer.ofSequential. Mas eu posso ter gatherers que não se importam com a ordem, como por exemplo map.

Não se importar com a ordem não quer dizer que o output será bagunçado, apenas que a ordem de recebimento das informações não altera o resultado final. Mesmo o map gera o resultado na mesma ordem de chegada, depois de combinar as streams paralelas.

Para o caso de paralelismo e também de gatherers com estado, é necessário unir os estados para evitar uma surpresa ruim. Exemplo de gatherer stateful com paralelismo? sorted.

Dá para complicar bastante o estado, como por exemplo nas implementações de takeWhile2 (que satisfaz takeWhile) e dropWhile2 (que satisfaz dropWhile), mas quando mais plano for o objeto de estado, mais fácil de entender o que está acontecendo. Além de que a lógica fica mais no próprio integrator.

Ao usar um combiner, se faz necessário usar um finisher.

distinct

distinct claramente precisa guardar estado. Aqui é importante a ordem com que chegam os elementos, então Gatherer.ofSequential.

// Stream<T> distinct()
// aqui uma solução sequencial
<A> Gatherer<A, ?, A> distinct() {
	return Gatherer.ofSequential(
		HashSet::new,
		Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
			if (state.add(element)) {
				return downstream.push(element);
			}
			return !downstream.isRejecting();
		})
	);
}

Mas eu poderia fazer com Gatherer.of contanto que eu tomasse alguns cuidados:

  • o estado agora precisa necessariamente ser LinkedHashSet, pois ele vai guardar a ordem de inserção
  • o combiner deve favorecer o lado esquerdo
  • eu só posso enviar os dados downstream no finisher

É possível adaptar a implementação do disctincBy para satisfazer o distinct, visto que o combiner dele já lida com “favorecer o lado esquerdo”.

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(distinct())
    .toList();
// Gives ["A", "B", "CC", "DD", "EEE"]

dropWhile

Aqui eu preciso manter estado, apenas pra saber se preciso continuar dropando. Não tenho uso para o estado após virar a chave para aceitar as coisas.

Portanto, se eu ainda estiver em “estado de drop”, eu preciso testar com o predicado passo. O predicado indicando que não precisa mais dropar, altero o estado para “estado de aceite” e não preciso nunca mais testar os elementos.

Como eu preciso dropar as coisas até achar algo novo, esse gatherer é essencialmente sequencial. Eu vou consumir o input inteiro.

// Stream<T> dropWhile(Predicate<? super T> predicate)
// aqui uma solução sequencial
<A> Gatherer<A, ?, A> dropWhile(Predicate<? super A> predicate) {
	class State {
		boolean drop = true;
	}

	return Gatherer.ofSequential(State::new,
		Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            if (state.drop) {
                if (!predicate.test(element)) {
                    state.drop = false;
                    return ds.push(element);
                }
                return !ds.isRejecting();
            }
            return ds.push(element);
        })
	);
}

<A> Gatherer<A, ?, A> dropWhile2(Predicate<? super A> predicate) {
	class State {
		BiPredicate<A, Gatherer.Downstream<? super A>> whatToDo;

		State() {
			this.whatToDo = (e, ds) -> {
				if (!predicate.test(e)) {
					final var pushed = ds.push(e);
					this.whatToDo = (el, ds2) -> ds2.push(el);
					return pushed;
				}
				return !ds.isRejecting();
			};
		}
	}

	return Gatherer.ofSequential(State::new,
		Gatherer.Integrator.ofGreedy((state, element, ds) ->
			state.whatToDo.test(element, ds)
		)
	);
}

Na implementação secundária eu tentei guardar no estado a próxima ação, assim a função whatToDo mantém o que precisa ser feito.

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(dropWhile(s -> s.length() == 1))
    .toList();
// Gives ["CC", "A", "DD", "EEE"]

filter

No filtro eu não preciso manter estado. A ordem de chegada não afeta o próximo elemento, então posso paralelizar livremente. Vou consumir até o fim.

// Stream<T> filter(Predicate<? super T> predicate)
<A> Gatherer<A, ?, A> filter(Predicate<? super A> predicate) {
    return Gatherer.of(
        Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            if (predicate.test(element)) {
                return ds.push(element);
            }
            return !ds.isRejecting();
        })
    );
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(filter(s -> s.length() == 1))
    .toList();
// Gives ["A", "A", "B", "A", "A"]

flatMap

A ordem também não é importante pra o flatMap, nem tampouco o estado anterior. Irei consumir até o fim

// <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)
<A, R> Gatherer<A, ?, R> flatMap(Function<? super A, ? extends Stream<? extends R>> mapper) {
    return Gatherer.of(
        Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            mapper.apply(element).forEach(ds::push);
			return !ds.isRejecting();
		})
    );
}

Por um tempo fiquei me questionando como eu faria para indicar que o downstream iria receber novos elementos. No fim, a solução é iterar na stream obtida pelo mapper e, para cada elemento da stream, passar adiante. Esse código poderia ser melhor otimizado caso o ds.push recusasse o elemento, mas está bom o suficiente para o meu fim.

Outra coisa que eu me confundi bastante foi na tipagem do downstream. Eu coloquei erroneamente que seria um Gatherer<A, ?, Stream<R>>, mas na verdade eu não vou consumir elementos do tipo Stream<R>, apenas elementos do tipo R. Eu me enganei bastante por conta do tipo do mapper, que é A -> Stream<R>.

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(flatMap(e -> e.chars().mapToObj(c -> ((char) c) + "")))
    .toList();
// Gives ["A", "A", "B", "A", "C", "C", "A", "D", "D", "E", "E", "E"]

limit

Aqui eu quero realizar um curto circuito! Ao chegar em determinado valor, acabou a streaming de dados! Então Gatherer.Integrator.of.

A ordem de chegada é extremamente importante, pois só vou pegar os maxSize primeiros elementos. O estado indica quantos elementos já foram adicionados ao downstream. Poderia ter feito equivalentemente “quantos elementos faltam”, mas achei mais semântico comparar com state.qnt >= maxSize do que state.remaining > 0.

// Stream<T> limit(long maxSize)
<A> Gatherer<A, ?, A> limit(long maxSize) {
    class State {
        long qnt = 0;
    }
    return Gatherer.ofSequential(
        State::new,
        Gatherer.Integrator.of((state, element, ds) -> {
            if (state.qnt >= maxSize) {
                return false;
            }
            state.qnt++;
			return ds.push(element);
		})
    );
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(limit(3))
    .toList();
// Gives ["A", "A", "B"]

map

Aqui nota que finalmente temos um mapeamento trivial que altera o retorno! Não preciso guardar estado, não preciso me preocupar com o que veio antes e irei consumir tudo até o fim:

// <R> Stream<R> map(Function<? super T, ? extends R> mapper)
<A, R> Gatherer<A, ?, R> map(Function<? super A, ? extends R> mapper) {
    return Gatherer.of(
        Gatherer.Integrator.ofGreedy((state, element, ds) -> ds.push(mapper.apply(element)))
    );
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(map(String::length))
    .toList();
// Gives [1, 1, 1, 1, 2, 1, 2, 3]

mapMulti

Aqui basicamente as mesmas dificuldades do flatMap, mas eu passei antes pela experiência com o flatMap portanto já vim calejado.

// default <R> Stream<R> mapMulti(BiConsumer<? super T, ? super Consumer<R>> mapper)
<A, R> Gatherer<A, ?, R> mapMulti(BiConsumer<? super A, ? super Consumer<R>> mapper) {
    return Gatherer.of(
        Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            mapper.accept(element, (Consumer<R>) ds::push);
            return !ds.isRejecting();
        })
    );
}

Uma coisa que pegou é que ? super Consumer<R> não é Consumer<R>, então ds::push não estava funcionando. ? super Consumer<R> basicamente indica super tipos de Consumer<R>, logo não tem nenhuma garantia de que precisa implementar accept, e é por isso que eu faço o cast (Consumer<T>) ds::push.

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(mapMulti((s, c) -> {
        for (int i = 0; i < s.length(); i++) {
            c.accept(s.charAt(i));
        }
    }))
    .toList();
// Gives ["A", "A", "B", "A", "C", "C", "A", "D", "D", "E", "E", "E"]

peek

Pego um elemento, olho ele, passo pra frente. Stateless, sem preocupação com ordem de chegada, sem preocupação com curto circuito.

// Stream<T> peek(Consumer<? super T> action)
<A> Gatherer<A, ?, A> peek(Consumer<? super A> action) {
    return Gatherer.of(
        Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            action.accept(element);
            return ds.push(element);
        })
    );
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(peek(System.out::println))
    .toList();
// Gives ["A", "A", "B", "A", "CC", "A", "DD", "EEE"]
// Output:
// A
// A
// B
// A
// CC
// A
// DD
// EEE

skip

Similar ao limit, mas aqui eu quero ir até o fim (portanto Gatherer.Integrator.ofGreedy). Não faz sentido ter uma implementação paralelizada disso, pois preciso ignorar os primeiros n elementos.

Basicamente o estado serve para contar quantos elementos foram criados no meio do caminho.

// Stream<T> skip(long n)
<A> Gatherer<A, ?, A> skip(long n) {
    class State {
        long qnt = 0;
    }
    return Gatherer.ofSequential(
        State::new,
        Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            if (state.qnt < n) {
                state.qnt++;
                return !ds.isRejecting();
            }
            return ds.push(element);
        })
    );
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(skip(3))
    .toList();
// Gives ["A", "CC", "A", "DD", "EEE"]

sorted

Aqui eu só posso passar adiante após o fim da streaming. Mas adivinha uma coisa interessante? A ordem meio que não importa: eu posso acumular as coisas e, na hora de juntar, manter o que tava na esquerda a esquerda, daí a função de sort vai manter a ordenação estável (tendo fé o Timsort, claro).

O estado é basicamente um ArrayList, sem segredos. Para combinar, é adicionar os elementos da direito após os elementos da esquerda, algo assim:

(left, right) -> {
    left.addAll(right);
    return left;
}

Basicamente, na hora de terminar o recebimento precisamos ordenar a lista e, então, passar sortedList.forEach(ds::push). Se usar um for-each clássico dá para otimizar e terminar o laço na primeira falha de ds.push, mas isso funcionou bem o suficiente para mim.

// Stream<T> sorted(Comparator<? super T> comparator)
<A> Gatherer<A, ?, A> sorted(Comparator<? super A> comparator) {
    return Gatherer.of(
		ArrayList<A>::new,
        Gatherer.Integrator.ofGreedy((state, element, ds) -> {
            state.add(element);
            return !ds.isRejecting();
        }),
        (left, right) -> {
            left.addAll(right);
            return left;
        },
        (state, ds) -> {
			state.sort(comparator);
			state.forEach(ds::push);
		}
    );
}

Nota que eu tive tendo problemas com state.sort(comparator) lá no finisher porque eu estava criando o estado com ArrayList::new. E esse tipo o Java não conseguiu inferir nada de interessante, portanto o tipo de state não tinha um generics compatível com o de comparator. Para resolver isso, descobri que posso tipar a lambda de criação usando ArrayList<A>::new.

Finalmente, para o sorted sem argumentos, passei o Comparator.naturalOrder para o sorted com o comparator. Fiz um casting porque o compilador não pode garantir que A extends Comparable<A>, então forcei a barra com (Comparator<A>) Comparator.naturalOrder().

// Stream<T> sorted()
@SuppressWarnings("unchecked")
<A> Gatherer<A, ?, A> sorted() {
    return sorted((Comparator<A>) Comparator.naturalOrder());
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(sorted())
    .toList();
// Gives ["A", "A", "A", "A", "B", "CC", "DD", "EEE"]

Outra execução:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(map(String::length))
    .gather(sorted())
    .toList();
// Gives [1, 1, 1, 1, 1, 2, 2, 3]

takeWhile

Aqui mais um caso de que devemos receber as coisas sequencialmente, pois afinal abortar no primeiro sinal de “não precisa mais”. Ou seja, também precisa fazer curto-circuito.

Eu posso fazer stateless no momento que eu disparar o curto-circutio, pois, afinal, já indico que não irei continuar recebendo novos elementos.

// default Stream<T> takeWhile(Predicate<? super T> predicate)
<A> Gatherer<A, ?, A> takeWhile(Predicate<? super A> predicate) {
    return Gatherer.ofSequential(
        Gatherer.Integrator.of((state, element, ds) -> {
			if (predicate.test(element)) {
				return ds.push(element);
			}
			return false;
		})
    );
}

Antes de chegar na implemtação stateless, passei antes por guardar o estado. Eu poderia estar no estado de “aceitando”, estado “bom”, ou já poderia ter passado dele. Ao recusar o primeiro elemento, precisava atualizar o estado para “não está saudável”. Note que a primeira consulta sempre era ao estado, pra saber se estava saudável o suficiente para continuar.

<A> Gatherer<A, ?, A> takeWhile2(Predicate<? super A> predicate) {
    class State {
        boolean good = true;
    }
    return Gatherer.ofSequential(
        State::new,
        Gatherer.Integrator.of((state, element, ds) -> {
            if (!state.good) {
                return false;
            }
            final var t = predicate.test(element);
            if (t) {
                return ds.push(element);
            }
            return state.good = false;
        })
    );
}

E, finalmente, a implementação baseada na complicação do dropWhile2.

// default Stream<T> takeWhile(Predicate<? super T> predicate)
<A> Gatherer<A, ?, A> takeWhile3(Predicate<? super A> predicate) {
    class State {

        BiPredicate<A, Gatherer.Downstream<? super A>> whatToDo;
        State() {
            whatToDo = (e, d) -> {
                if (!predicate.test(e)) {
                    this.whatToDo = (_, _) -> false;
                    return false;
                }
                return d.push(e);
            }
        }
    }
    return Gatherer.ofSequential(
        State::new,
        Gatherer.Integrator.of((state, element, ds) -> state.whatToDo.test(element, ds))
    );
}

Executando:

final var l = Stream.of("A", "A", "B", "A", "CC", "A", "DD", "EEE")
    .gather(takeWhile(s -> s.length() == 1))
    .toList();
// Gives ["A", "A", "B", "A"]