I need to transform an stream of objects to a stream of batches of objects, grouping them by a property value using Reactive Extensions:
class Record
{
public string Group;
public int Value;
}
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
// ...
}
A batch is finished and sent to the output stream when either of these happens:
- A new object comes from source stream and its
Groupvalue differs from the previous value - There have been no new objects in the source stream for
Nseconds
For example, if a1 means new Record { Group = "a", Value = 1}:
input: -a1-a2-a3-b1-b2-
output: -[a1, a2, a3]-[b1, b2]-
input: -a1-a2----------a3-
output: -[a1, a2]-------[a3]-
Tried various combinations of GroupByUntil, Debounce, Buffer, and Timer to no avail. How is it done?