Rxjava2 source code analysis

Rxjava2 thread scheduling part

If rxjava2 analysis (I) is not finished, please move to https://blog.csdn.net/qq_/article/details/104864276

Ask questions:

1. How does rxjava2 switch threads.
2. How does rxjava2 keep threads synchronized.
3.Schedulers how to schedule threads.

1. Entry code

Observable.create(new ObservableOnSubscribe<ArrayList<String>>() {
    @Override
    public void subscribe(ObservableEmitter<ArrayList<String>> emitter) throws Exception {
        ArrayList<String>stringArrayList=new ArrayList<>();
        stringArrayList.add("hello");
        emitter.onNext(stringArrayList);
    }
}).doOnNext(new Consumer<ArrayList<String>>() {
    @Override
    public void accept(ArrayList<String> s) throws Exception {
        Log.i("msg2","The first thread starts execution"+Thread.currentThread()+s.toString());
        s.add("First");
    }
}).observeOn(Schedulers.io()).doOnNext(new Consumer<ArrayList<String>>() {
    @Override
    public void accept(ArrayList<String> s) throws Exception {
        Log.i("msg2","The second thread starts execution"+Thread.currentThread()+""+s.toString());
        Thread.sleep(8000);
         s.add("The second");
    }
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<ArrayList<String>>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(ArrayList<String> s) {
        Log.i("msg2","Downstream event thread"+Thread.currentThread()+s.toString());

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {

    }
});

From the output results, we can find that the thread switches from the main thread to another thread and back to the main thread. According to the above analysis results, we directly analyze the Schedulers.io() method.

//In the Scheduler class
@NonNull
public static Scheduler io() {
//Return an IO
    return RxJavaPlugins.onIoScheduler(IO);
}
//Let's see what IO is
@NonNull
static final Scheduler IO;
static {
......
       IO = RxJavaPlugins.initIoScheduler(new IOTask());
   ......
}
//We find that IO is a static Scheduler class, which is assigned as IOTask in the static static code block of the Scheduler
static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
//What is IoHolder.DEFAULT
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
//Finally, we find that IO is an instance of the IoScheduler class, which is static

Then let's continue to see that the observeOn(Schedulers.io()) method is actually passed in an instance of the IoScheduler, that is, the observeOn(IoScheduler).
Let's take a look at the observeOn method

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
......
     return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
//From the above code, we can see that an ObservableObserveOn class is returned
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
  ......
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        ......
    }
}
//Let's take a look at the observeOn(AndroidSchedulers.mainThread()) method. Here we directly show that the scheduler class of MainThread is HandlerScheduler, which we will analyze later
//Next, we will generate the structure chart according to the above code and examples

Next, the subscribe (Observer) method will be executed. As analyzed before, the subscribeActual (Observer <? Super T > Observer) method will be executed step by step. Here, we only need to look at this method of the ObservableObserveOn class, because we have analyzed this method of other classes.

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
    //Returns an EventLoopWorker class
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
//This method is similar to the previous method, but the incoming parameters are different. We generate the following structure chart

Next, according to the analysis of the previous article, the onSubscribe (Disposable d) method will be executed. Let's take a look at the onSubscribe method of the ObserveOnObserver.

@Override
public void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        if (d instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) d;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                downstream.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                downstream.onSubscribe(this);
                return;
            }
        }
//Generally, it will be executed here if there are no two connected schedulers before
//SpscLinkedArrayQueue is to save the value value from onNext (T value). The operations in it are atomic operations
        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        downstream.onSubscribe(this);
    }
}

The next step is to execute the onNext method step by step. Let's take a look at the onNex method of ObserveOnObserver

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}
//Here is the schedule method to execute EventLoopWorker. Let's analyze
Scheduler This method of the scheduler.
@Override
public Worker createWorker() {
//pool.get() is a static CachedWorkerPool class instance
    return new EventLoopWorker(pool.get());
}
//An instance of EventLoopWorker class is returned. Let's take a look at this class.
static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
......

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        //A collection of disposables is saved in CompositeDisposable
        this.tasks = new CompositeDisposable();
        //Get a ThreadWorker. First get it from the cache of ConcurrentLinkedQueue. If there is no cache, an instance will be generated
        this.threadWorker = pool.get();
    }
......
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}
//Here, we receive a runable as a parameter, so the run method of ObserveOnObserver is
@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

What is the ThreadWorker class

static final class ThreadWorker extends NewThreadWorker {
   ......

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }
    ......
}

//NewThreadWorker class
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

......

    public NewThreadWorker(ThreadFactory threadFactory) {
//A thread pool is generated here
        executor = SchedulerPoolFactory.create(threadFactory);
    }
 @NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (disposed) {
        return EmptyDisposable.INSTANCE;
    }
    //Execute here
    return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
    //Add runnable to the thread pool here
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
 }

Here we summarize worker.schedule(this); the execution process is actually to create a thread pool and add the run method of ObserveOnObserver to the thread pool for execution

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
//Let's take a look at the drain normal method
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
            //Get value from queue
                v = q.poll();
            } catch (Throwable ex) {
           ......
                return;
            }
            ......
            //Execute the onNext method of downstream
            a.onNext(v);
            ......
         }
}

Let's summarize that the onNext method of ObserveOnObserver is actually to add its run method to the thread pool of ThreadWorker for execution. In the run method, the downstream onNext method, that is, the next linked list onNext method, is executed. In fact, to switch to the main thread later is to get a Message and use the handler of the main thread to execute the run method in messag, which will execute the onNext method in the downstream.
1. How does rxjava2 switch threads.
If the thread switch method is used previously, the thread pool is created by using the ThreadWorker in the Scheduler, and the run method is added to the thread pool for execution
2. How does rxjava2 keep threads synchronized.
Because the thread will continue to execute the onNext method after it finishes executing the onNext method
3.Schedulers how to schedule threads.
emmm overlaps the first problem.

Published 5 original articles, won praise 1, visited 89
Private letter follow

Posted on Sun, 15 Mar 2020 01:24:52 -0700 by DeadlySin3