An Rx Observable That Would Act As Replaysubject But Only For The 1st Subscriber?
What's an elegant way of composing an Rx observable which would resemble ReplaySubject, but only emit the accumulated sequence once and for the first subscriber only (when that sub
Solution 1:
I modified slightly the implementation found in a similar question, and changed the name of the class from ReplayOnceSubject
to ReplayFirstSubscriberOnlySubject
:
publicclassReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
privatereadonlyobject _locker = newobject();
private ISubject<T> _subject = new ReplaySubject<T>();
publicvoidOnNext(T value) { lock (_locker) _subject.OnNext(value); }
publicvoidOnError(Exception error) { lock (_locker) _subject.OnError(error); }
publicvoidOnCompleted() { lock (_locker) _subject.OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) thrownew ArgumentNullException(nameof(observer));
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
}
elsereturn _subject.Subscribe(observer);
}
}
}
This is probably not the most efficient solution, since two different lock
s are acquired on every operation (the _locker
and the internal_gate
), but it
shouldn't be very bad either.
Post a Comment for "An Rx Observable That Would Act As Replaysubject But Only For The 1st Subscriber?"