Skip to content Skip to sidebar Skip to footer

An Rx Observable That Would Act As Replaysubject But Only For The 1st Subscriber?

What's an elegant way of composing an Rx observable which would resemble ReplaySubject, but only emit the accumulated sequence once and for the first subscriber only (when that sub

Solution 1:

I modified slightly the implementation found in a similar question, and changed the name of the class from ReplayOnceSubject to ReplayFirstSubscriberOnlySubject:

publicclassReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
    privatereadonlyobject _locker = newobject();
    private ISubject<T> _subject = new ReplaySubject<T>();

    publicvoidOnNext(T value) { lock (_locker) _subject.OnNext(value); }
    publicvoidOnError(Exception error) { lock (_locker) _subject.OnError(error); }
    publicvoidOnCompleted() { lock (_locker) _subject.OnCompleted(); }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null) thrownew ArgumentNullException(nameof(observer));
        lock (_locker)
        {
            if (_subject is ReplaySubject<T> replaySubject)
            {
                var subject = new Subject<T>();
                var subscription = subject.Subscribe(observer);
                // Now replay the buffered notifications
                replaySubject.Subscribe(subject).Dispose();
                replaySubject.Dispose();
                _subject = subject;
                return subscription;
            }
            elsereturn _subject.Subscribe(observer);
        }
    }
}

This is probably not the most efficient solution, since two different locks are acquired on every operation (the _locker and the internal_gate), but it shouldn't be very bad either.

Post a Comment for "An Rx Observable That Would Act As Replaysubject But Only For The 1st Subscriber?"