Handwritten rxjava event transformation

First of all, how to use it

 Observable.just("http://d.paper.i4.cn/max/2016/10/10/15/1476085552237_716159.jpg")
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(@NonNull String s) throws Exception {
                        URL url = new URL("http://d.paper.i4.cn/max/2016/10/10/15/1476085552237_716159.jpg");
                        HttpURLConnection httpsURLConnection = (HttpURLConnection) url.openConnection();
                        httpsURLConnection.connect();
                        InputStream inputStream = httpsURLConnection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        return bitmap;
                    }
                }).map(new Function<Bitmap, Bitmap>() {
            @Override
            public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
                bitmap = createWatermark(bitmap, "rxjava");
                return bitmap;
            }
        }).subscribeOn(Schedulers.io()).
                observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        mImage.setImageBitmap(bitmap);
                    }
                });

Look at source code analysis

just source code analysis

    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        //Convert String to ObservableJust object
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

Next, let's look at the source code subscribe(new Consumer())

Consumer source code analysis

public interface Consumer<T> {
     //Generic interface, T must be our converted type
    void accept(T t) throws Exception;
}

subscribe source code analysis

 public final Disposable subscribe(Consumer<? super T> onNext) {
 return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
         //Encapsulate the incoming parameters. In fact, at this time, the simple point can be seen as our own replication of the observer's onNext method
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
         //Call the subscription method again, which was parsed in the previous article
        subscribe(ls);

        return ls;
    }

Handwritten subscribe method

First, write the observer interface

public interface Observer<T>  {
    void onSubscribe();
    void onNext(@NonNull T s);
    void onError(@NonNull Throwable e);
    void onComplete();
}

Then write the subscription interface ObservableSource

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

ObservableJust object encapsulation

public class ObservableJust<T> extends Observable<T> {
    private T item;
    public ObservableJust(T item) {
     this.item=item;
    }


    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //Encapsulate observer and value
        ScalarDisposable sd=new ScalarDisposable(observer,item);
        observer.onSubscribe();
        sd.run();
    }
    //Execute in this class
    private class ScalarDisposable<T>{
        private Observer observer;
        private T item;

        public ScalarDisposable(Observer<T> observer, T item) {
            this.observer = observer;
            this.item = item;
        }

        public void run() {
            try {
                observer.onNext(item);
                observer.onComplete();
            } catch (Exception e) {
                observer.onError(e);
            }

        }
    }
}

Then write the usage of MainActivity

 Observable.just("http://d.paper.i4.cn/max/2016/10/10/15/1476085552237_716159.jpg")
               .subscribe(new Consumer<String>() {
                   @Override
                   public void onNext(String item) {
                        Log.e("TAG",item);
                   }
               });

Consumer interface

public interface Consumer<T>  {
    void onNext(T item);
}

Observable

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> just(T item) {
        return onAssembly(new ObservableJust<T>(item));
    }
    //It actually returns the ObservableJust object
    private static <T> Observable<T> onAssembly(ObservableJust<T> source) {
        return source;
    }

    @Override
    public void subscribe(@NonNull Observer<? super T> observer) {
          subscribeActual(observer);
    }
     //Subscribe
    protected abstract void subscribeActual(Observer<? super T> observer);

    public void subscribe(Consumer<T> consumer){//The actual call is to encapsulate consumer as Observer
          subscribe(new LambdaObserver(consumer));
    }
}

LambdaObserver

public class LambdaObserver<T> implements Observer<T> {
    private Consumer<T> mConsumer;
    public  LambdaObserver(Consumer<T> consumer) {
        this.mConsumer=consumer;
    }

    @Override
    public void onSubscribe() {

    }

    @Override
    public void onNext(@NonNull T s) {
      mConsumer.onNext(s);
    }

    @Override
    public void onError(@NonNull Throwable e) {

    }

    @Override
    public void onComplete() {

    }
}

Source code analysis of Map operator

map(new Function

public interface Function<T, R> {
    //T is from us. Here is string. R is our target. Here is Bitmap 
    R apply(@NonNull T t) throws Exception;
}

map source code analysis

  public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //It is actually to convert our incoming parameters and targets into observablemap this is an ObservableSource
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

ObservableMap look at this class, it will copy subscribeActual

   public void subscribeActual(Observer<? super U> t) {
         //The subscription method of proxy object MapObserver is encapsulated again
        source.subscribe(new MapObserver<T, U>(t, function));
    }

Handwritten map operator

First, let's see the use of MainActivity

  new Thread(new Runnable() {
            @Override
            public void run() {
                Observable.just("http://img.taopic.com/uploads/allimg/130331/240460-13033106243430.jpg")
                        .map(new Function<String, Bitmap>() {
                            @Override
                            public Bitmap apply(String urlPath) throws Exception {
                                // The fifth step
                                URL url = new URL(urlPath);
                                HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                                InputStream inputStream = urlConnection.getInputStream();
                                Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                                return bitmap;
                            }
                        })
                        .map(new Function<Bitmap, Bitmap>() {
                            @Override
                            public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
                                bitmap = createWatermark(bitmap, "RxJava2.0");
                                return bitmap;
                            }
                        })
                        .map(new Function<Bitmap, Bitmap>() {
                            @Override
                            public Bitmap apply(Bitmap bitmap) throws Exception {
                                return bitmap;
                            }
                        })
                        .subscribe(new Consumer<Bitmap>() {
                            @Override
                            public void onNext(final Bitmap bitmap) {
                                // The seventh step
                                Log.e("TAG", "item = " + bitmap);
                                runOnUiThread(new Runnable() {
                                    @Override
                                    public void run() {
                                        mImage.setImageBitmap(bitmap);
                                    }
                                });
                            }
                        });
            }
        }).start();

Function interface

//R is the goal.
public interface Function<T,R> {
    R apply(T t) throws Exception;
}

Observable add code

public <R> Observable<R> map(Function<T, R> function){
        //String,Bitmap encapsulation to ObservableJust
        return onAssembly(new ObservableMap<>(this,function));
    }

ObservableMap class

public class ObservableMap<T, R> extends Observable<R> {
    final Observable<T> source;// Observable in front
    final Function<T, R> function;// Current conversion

    public ObservableMap(Observable<T> source, Function<T, R> function) {
        this.source = source;
        this.function = function;
    }

    //Subscription has been replicated again
    @Override
    protected void subscribeActual(Observer<? super R> observer) {
        source.subscribe(new MapObserver(observer, function));
    }

    private class MapObserver<T> implements Observer<T> {

        final Observer<R> observer;
        final Function<T, R> function;

        public MapObserver(Observer<R> source, Function<T, R> function) {
            this.observer = source;
            this.function = function;
        }

        @Override
        public void onSubscribe() {
       observer.onSubscribe();
        }

        @Override
        public void onNext(@NonNull T s) {
            try {
                R applyR = function.apply(s);
                observer.onNext(applyR);
            } catch (Exception e) {
                e.printStackTrace();
                observer.onError(e);
            }
        }

        @Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            observer.onComplete();
        }
    }
}

Posted on Sun, 05 Apr 2020 10:22:13 -0700 by Isoss