Interface Gatherer<T,A,R>
- Type Parameters:
T
- the type of input elements to the gatherer operationA
- the potentially mutable state type of the gatherer operation (often hidden as an implementation detail)R
- the type of output elements from the gatherer operation
Gatherer
is a preview API of the Java platform.
Gatherer operations can be performed either sequentially, or be parallelized -- if a combiner function is supplied.
There are many examples of gathering operations, including but not
limited to:
grouping elements into batches (windowing functions);
de-duplicating consecutively similar elements; incremental accumulation
functions (prefix scan); incremental reordering functions, etc. The class
Gatherers
PREVIEW provides implementations of common
gathering operations.
- API Note:
A
Gatherer
is specified by four functions that work together to process input elements, optionally using intermediate state, and optionally perform a final action at the end of input. They are:- creating a new, potentially mutable, state (
initializer()
) - integrating a new input element (
integrator()
) - combining two states into one (
combiner()
) - performing an optional final action (
finisher()
)
Each invocation of
initializer()
,integrator()
,combiner()
, andfinisher()
must return a semantically identical result.Implementations of Gatherer must not capture, retain, or expose to other threads, the references to the state instance, or the downstream
Gatherer.Downstream
PREVIEW for longer than the invocation duration of the method which they are passed to.Performing a gathering operation with a
Gatherer
should produce a result equivalent to:Gatherer.Downstream<? super R> downstream = ...; A state = gatherer.initializer().get(); for (T t : data) { gatherer.integrator().integrate(state, t, downstream); } gatherer.finisher().accept(state, downstream);
However, the library is free to partition the input, perform the integrations on the partitions, and then use the combiner function to combine the partial results to achieve a gathering operation. (Depending on the specific gathering operation, this may perform better or worse, depending on the relative cost of the integrator and combiner functions.)
In addition to the predefined implementations in
Gatherers
PREVIEW, the static factory methodsof(...)
andofSequential(...)
can be used to construct gatherers. For example, you could create a gatherer that implements the equivalent ofStream.map(java.util.function.Function)
with:public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) { return Gatherer.of( (unused, element, downstream) -> // integrator downstream.push(mapper.apply(element)) ); }
Gatherers are designed to be composed; two or more Gatherers can be composed into a single Gatherer using the
andThen(Gatherer)
method.// using the implementation of `map` as seen above Gatherer<Integer, ?, Integer> increment = map(i -> i + 1); Gatherer<Object, ?, String> toString = map(i -> i.toString()); Gatherer<Integer, ?, String> incrementThenToString = increment.andThen(toString);
As an example, a Gatherer implementing a sequential Prefix Scan could be done the following way:
public static <T, R> Gatherer<T, ?, R> scan( Supplier<R> initial, BiFunction<? super R, ? super T, ? extends R> scanner) { class State { R current = initial.get(); } return Gatherer.<T, State, R>ofSequential( State::new, Gatherer.Integrator.ofGreedy((state, element, downstream) -> { state.current = scanner.apply(state.current, element); return downstream.push(state.current); }) ); }
Example of usage:
// will contain: ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"] List<String> numberStrings = Stream.of(1,2,3,4,5,6,7,8,9) .gather( scan(() -> "", (string, number) -> string + number) ) .toList();
- creating a new, potentially mutable, state (
- Implementation Requirements:
- Libraries that implement transformations based on
Gatherer
, such asStream.gather(Gatherer)
PREVIEW, must adhere to the following constraints:- Gatherers whose initializer is
defaultInitializer()
are considered to be stateless, and invoking their initializer is optional. - Gatherers whose integrator is an instance of
Gatherer.Integrator.Greedy
PREVIEW can be assumed not to short-circuit, and the return value of invokingGatherer.Integrator.integrate(Object, Object, Downstream)
PREVIEW does not need to be inspected. - The first argument passed to the integration function, both arguments passed to the combiner function, and the argument passed to the finisher function must be the result of a previous invocation of the initializer or combiner functions.
- The implementation should not do anything with the result of any of the initializer or combiner functions other than to pass them again to the integrator, combiner, or finisher functions.
- Once a state object is passed to the combiner or finisher function, it is never passed to the integrator function again.
- When the integrator function returns
false
, it shall be interpreted just as if there were no more elements to pass it. - For parallel evaluation, the gathering implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after integration is complete for both partitions.
- Gatherers whose combiner is
defaultCombiner()
may only be evaluated sequentially. All other combiners allow the operation to be parallelized by initializing each partition in separation, invoking the integrator until it returnsfalse
, and then joining each partitions state using the combiner, and then invoking the finisher on the joined state. Outputs and state later in the input sequence will be discarded if processing an earlier partition short-circuits. - Gatherers whose finisher is
defaultFinisher()
are considered to not have an end-of-stream hook and invoking their finisher is optional.
- Gatherers whose initializer is
- Since:
- 22
- See Also:
-
Nested Class Summary
Modifier and TypeInterfaceDescriptionstatic interface
Preview.A Downstream object is the next stage in a pipeline of operations, to which elements can be sent.static interface
Preview.An Integrator receives elements and processes them, optionally using the supplied state, and optionally sends incremental results downstream. -
Method Summary
Modifier and TypeMethodDescriptionReturns a composed Gatherer which connects the output of this Gatherer to the input of that Gatherer.default BinaryOperator
<A> combiner()
A function which accepts two intermediate states and combines them into one.static <A> BinaryOperator
<A> Returns a combiner which is the default combiner of a Gatherer.static <A,
R> BiConsumer <A, Gatherer.DownstreamPREVIEW<? super R>> Returns afinisher
which is the default finisher of aGatherer
.static <A> Supplier
<A> Returns an initializer which is the default initializer of a Gatherer.default BiConsumer
<A, Gatherer.DownstreamPREVIEW<? super R>> finisher()
A function which accepts the final intermediate state and aGatherer.Downstream
PREVIEW object, allowing to perform a final action at the end of input elements.A function that produces an instance of the intermediate state used for this gathering operation.A function which integrates provided elements, potentially using the provided intermediate state, optionally producing output to the providedGatherer.Downstream
PREVIEW.of
(Supplier<A> initializer, Gatherer.IntegratorPREVIEW<A, T, R> integrator, BinaryOperator<A> combiner, BiConsumer<A, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, parallelizable,Gatherer
described by the giveninitializer
,integrator
,combiner
andfinisher
.of
(Gatherer.IntegratorPREVIEW<Void, T, R> integrator) Returns a new, parallelizable, and statelessGatherer
described by the givenintegrator
.of
(Gatherer.IntegratorPREVIEW<Void, T, R> integrator, BiConsumer<Void, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, parallelizable, and statelessGatherer
described by the givenintegrator
andfinisher
.ofSequential
(Supplier<A> initializer, Gatherer.IntegratorPREVIEW<A, T, R> integrator) Returns a new, sequential,Gatherer
described by the giveninitializer
andintegrator
.ofSequential
(Supplier<A> initializer, Gatherer.IntegratorPREVIEW<A, T, R> integrator, BiConsumer<A, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, sequential,Gatherer
described by the giveninitializer
,integrator
, andfinisher
.ofSequential
(Gatherer.IntegratorPREVIEW<Void, T, R> integrator) Returns a new, sequential, and statelessGatherer
described by the givenintegrator
.ofSequential
(Gatherer.IntegratorPREVIEW<Void, T, R> integrator, BiConsumer<Void, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, sequential, and statelessGatherer
described by the givenintegrator
andfinisher
.
-
Method Details
-
initializer
A function that produces an instance of the intermediate state used for this gathering operation.- Implementation Requirements:
- The implementation in this interface returns
defaultInitializer()
. - Returns:
- A function that produces an instance of the intermediate state used for this gathering operation
-
integrator
Gatherer.IntegratorPREVIEW<A,T, integrator()R> A function which integrates provided elements, potentially using the provided intermediate state, optionally producing output to the providedGatherer.Downstream
PREVIEW.- Returns:
- a function which integrates provided elements, potentially using the provided state, optionally producing output to the provided Downstream
-
combiner
A function which accepts two intermediate states and combines them into one.- Implementation Requirements:
- The implementation in this interface returns
defaultCombiner()
. - Returns:
- a function which accepts two intermediate states and combines them into one
-
finisher
A function which accepts the final intermediate state and aGatherer.Downstream
PREVIEW object, allowing to perform a final action at the end of input elements.- Implementation Requirements:
- The implementation in this interface returns
defaultFinisher()
. - Returns:
- a function which transforms the intermediate result to the final result(s) which are then passed on to the provided Downstream
-
andThen
Returns a composed Gatherer which connects the output of this Gatherer to the input of that Gatherer.- Implementation Requirements:
- The implementation in this interface returns a new Gatherer
which is semantically equivalent to the combination of
this
andthat
gatherer. - Type Parameters:
RR
- The type of output of that Gatherer- Parameters:
that
- the other gatherer- Returns:
- returns a composed Gatherer which connects the output of this Gatherer as input that Gatherer
- Throws:
NullPointerException
- if the argument isnull
-
defaultInitializer
Returns an initializer which is the default initializer of a Gatherer. The returned initializer identifies that the owner Gatherer is stateless.- Implementation Requirements:
- This method always returns the same instance.
- Type Parameters:
A
- the type of the state of the returned initializer- Returns:
- the instance of the default initializer
- See Also:
-
defaultCombiner
Returns a combiner which is the default combiner of a Gatherer. The returned combiner identifies that the owning Gatherer must only be evaluated sequentially.- Implementation Requirements:
- This method always returns the same instance.
- Type Parameters:
A
- the type of the state of the returned combiner- Returns:
- the instance of the default combiner
- See Also:
-
defaultFinisher
Returns afinisher
which is the default finisher of aGatherer
. The returned finisher identifies that the owning Gatherer performs no additional actions at the end of input.- Implementation Requirements:
- This method always returns the same instance.
- Type Parameters:
A
- the type of the state of the returned finisherR
- the type of the Downstream of the returned finisher- Returns:
- the instance of the default finisher
- See Also:
-
ofSequential
static <T,R> GathererPREVIEW<T,Void, ofSequentialR> (Gatherer.IntegratorPREVIEW<Void, T, R> integrator) Returns a new, sequential, and statelessGatherer
described by the givenintegrator
.- Type Parameters:
T
- the type of input elements for the new gathererR
- the type of results for the new gatherer- Parameters:
integrator
- the integrator function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if the argument isnull
-
ofSequential
static <T,R> GathererPREVIEW<T,Void, ofSequentialR> (Gatherer.IntegratorPREVIEW<Void, T, R> integrator, BiConsumer<Void, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, sequential, and statelessGatherer
described by the givenintegrator
andfinisher
.- Type Parameters:
T
- the type of input elements for the new gathererR
- the type of results for the new gatherer- Parameters:
integrator
- the integrator function for the new gathererfinisher
- the finisher function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if any argument isnull
-
ofSequential
static <T,A, GathererPREVIEW<T,R> A, ofSequentialR> (Supplier<A> initializer, Gatherer.IntegratorPREVIEW<A, T, R> integrator) Returns a new, sequential,Gatherer
described by the giveninitializer
andintegrator
.- Type Parameters:
T
- the type of input elements for the new gathererA
- the type of state for the new gathererR
- the type of results for the new gatherer- Parameters:
initializer
- the initializer function for the new gathererintegrator
- the integrator function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if any argument isnull
-
ofSequential
static <T,A, GathererPREVIEW<T,R> A, ofSequentialR> (Supplier<A> initializer, Gatherer.IntegratorPREVIEW<A, T, R> integrator, BiConsumer<A, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, sequential,Gatherer
described by the giveninitializer
,integrator
, andfinisher
.- Type Parameters:
T
- the type of input elements for the new gathererA
- the type of state for the new gathererR
- the type of results for the new gatherer- Parameters:
initializer
- the initializer function for the new gathererintegrator
- the integrator function for the new gathererfinisher
- the finisher function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if any argument isnull
-
of
Returns a new, parallelizable, and statelessGatherer
described by the givenintegrator
.- Type Parameters:
T
- the type of input elements for the new gathererR
- the type of results for the new gatherer- Parameters:
integrator
- the integrator function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if any argument isnull
-
of
static <T,R> GathererPREVIEW<T,Void, ofR> (Gatherer.IntegratorPREVIEW<Void, T, R> integrator, BiConsumer<Void, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, parallelizable, and statelessGatherer
described by the givenintegrator
andfinisher
.- Type Parameters:
T
- the type of input elements for the new gathererR
- the type of results for the new gatherer- Parameters:
integrator
- the integrator function for the new gathererfinisher
- the finisher function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if any argument isnull
-
of
static <T,A, GathererPREVIEW<T,R> A, ofR> (Supplier<A> initializer, Gatherer.IntegratorPREVIEW<A, T, R> integrator, BinaryOperator<A> combiner, BiConsumer<A, Gatherer.DownstreamPREVIEW<? super R>> finisher) Returns a new, parallelizable,Gatherer
described by the giveninitializer
,integrator
,combiner
andfinisher
.- Type Parameters:
T
- the type of input elements for the new gathererA
- the type of state for the new gathererR
- the type of results for the new gatherer- Parameters:
initializer
- the initializer function for the new gathererintegrator
- the integrator function for the new gatherercombiner
- the combiner function for the new gathererfinisher
- the finisher function for the new gatherer- Returns:
- the new
Gatherer
- Throws:
NullPointerException
- if any argument isnull
-
Gatherer
when preview features are enabled.