13 Ekim 2017 Cuma

Temel RxJava (3)

Operatörler

Rxjava yı geleneksel java koduna göre bir adım öne geçiren olayın , yayınlanan datayı işlemek için çok geniş metod seti olduğundan ilk yazdımda bahsetmiştim . Bu yazımda bunlardan birkaçını açıklamaya çalışacağım .

flatMap()

Bir observable ın sonucunda oluşan değer tekrar başka bir observable a bağlanıyorsa . Yani asenkron bir işlem sonucu aslında başka bir asenkron işlem sonucuna bağlıysa içiçe iki tane observable yazmaktansa flatmap kullanılır .


Single.fromCallable(new Callable() {
            @Override
            public Integer call() throws Exception {
                return 1;
                //return 2;
            }
        }).flatMap(new Function>() {
            @Override
            public SingleSource apply(Integer integer) throws Exception {

                if (integer == 1)
                    return Single.just("fisrt");
                else
                    return Single.just("second");
            }
        }).subscribe(new Consumer() {
            @Override
            public void accept(String s) throws Exception {
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        });
Ekran Çıktısı : "first"
Örnekte ilk olarak Integer dönen bir observable oluşturulmuştur . Başka bir observable bunu dinlemektedir ve dönen sonuca göre String bir değer dönmektedir .

reduce()

Bir Observable birden çok değer üretiyor ve sonuc olarak bu değerlerin toplamı lazımsa reduce kullanılabilir .


  Observable.create(new ObservableOnSubscribe>() {
            @Override
            public void subscribe(ObservableEmitter> e) throws Exception {


                ArrayList strings = new ArrayList<>();
                strings.add("Hello");
                strings.add("Bro");
                e.onNext(strings);

                strings = new ArrayList<>();
                strings.add("Whats");
                strings.add("Up");
                e.onNext(strings);
                e.onComplete();

            }
        }).reduce(new BiFunction, List, List>() {
            @Override
            public List apply(List strings, List strings2) throws Exception {
                strings.addAll(strings2);
                return strings;
            }
        }).subscribe(new Consumer>() {
            @Override
            public void accept(List strings) throws Exception {
                Toast.makeText(MainActivity.this, strings.toString(), Toast.LENGTH_SHORT).show();
            }
        });

Ekran Çıktısı : [Hello,Bro,Whatsup,Kro]

onNext ile yayınlanan her "strings" listesi reduce methoduna girer ve orada parent strings e eklenir . Subsribe ise onComplete den sonra çalışır ve sonuç olarak iki onNext metoduyla yayınlanmış olan listelerin artarda eklenmiş halini alır .

Temel RxJava (2)

Observable - Single - Completable - Maybe

Bir önceki yazımda observable nedir nasıl kullanılırdan bahsetmiştim . Şimdi ise Observable yerine kullanılabilecek diğer dinleyici türlerine bir göz atalım .

Observable -> Birden fazla kez dinleyici tetiklenecek ise kullanılır . Bu yüzden onNext , onComplete adında iki methodu vardır . Her yayınlamada onNext tetiklenirken tüm akış bittiğinde ise onComplete tetiklenir .


Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {

                e.onNext("Hello");
                e.onNext("Ardahan");
                e.onComplete();

            }
        }).subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(Object o) {
                Log.d(Tag, o);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {}
        });

Single -> Dinleyici sadece bir kez tetiklecek ise kullanılır . Bu yüzden onSuccess methodu mevcuttur .


Single.create(new SingleOnSubscribe() {
            @Override
            public void subscribe(SingleEmitter e) throws Exception {
                e.onSuccess("Hello");

            }
        }).subscribe(new SingleObserver() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onSuccess(Object o) {
                Log.d(Tag, o);
            }

            @Override
            public void onError(Throwable e) {}
        });

Completable - > Single ile aynı gibi görünsede burada dinleyici tetiklenirken observable dan herhangi bir değer beklenmez . Arkaplanda bir işlem gerçekleştirilir ama sadece bitip bitmediğiyle ilgilenilir . 


Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                e.onComplete();
            }
        }).subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onComplete() {
                Log.d(Tag, "done");
            }

            @Override
            public void onError(Throwable e) {}
        });

Maybe -> Single ile Completable birleşimi gibidir . Hem onSuccess hem de onComplete methodları mevcut . Yani hiçbir sonuç dönmeyedebilir , tek bir sonuç dönedebilir .


Maybe.create(new MaybeOnSubscribe() {
            @Override
            public void subscribe(MaybeEmitter e) throws Exception {
                e.onSuccess("Hello");
               // e.onComplete();
            }
        }).subscribe(new MaybeObserver() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onSuccess(String s) {
                Log.d(Tag, s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                Log.d(Tag, "done");
            }
        });

Temel RxJava (1)

RxJava Nedir ?

RxJava (Reactive Extension) en temelde observable pattern üzerine kurulu asenkron işlemler için kullanılan güçlü bir kütüphanedir . Kendi içinde 3 parçaya ayrılır .

Observable (Gözlenen) - Observer (Gözlemci) - Datayı işleyen methodlar 

Observable = Datanın yayınlandığı yer
Observer = Datanın dinlendiği yer
Datayı İşleme = Data üzerinde filtreleme , ekleme , çıkarma vs.. işlemlerin yapıldığı yer

Observable.just(" Hello World ").map(new Function() {
            @Override
            public String apply(String s) throws Exception {
                return s.trim();
            }
        }).subscribe(new Consumer() {
            @Override
            public void accept(String s) throws Exception {
                Log.d("", s);
            }
        });


Ekran Çıktısı : HelloWorld

Yukarıdaki örnekte en basit haliyle bir örnek görebilirisiniz . "Hello World" yaylınlanıyor -> trim() ile filtreleniyor -> subscribe edilen yerde sonuç görüntüleniyor .

Neden RxJava Kullanmalıyım ?

Yukarıdaki örnekte bir string yayınlanmış onu da bir yerden dinlemiş basmışsın , bunun için mi yani bu rxjava diyebilirsiniz :) haklısınız . Peki ama neden neden kullanmalıyım diye sorarsanız !

(Java)


 new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                final List strings = new ArrayList<>();
                strings.add("Hello");

                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        Toast.makeText(MainActivity.this, strings.toString(), Toast.LENGTH_SHORT).show();
                    }
                });


            }
        }).start();

Yapmak istediğim çok basit , bana bir liste dönecek olan bir işlem ama 500 milisaniyelik bir gecikme ile cevap veriyor . Cihazda UI kitlemeden arkaplanda bir işlem yapacağınızı düşünün ki bunu yapmak için yeni bir thread açmanız gerekiyor -Örnek amaçlı ben sonucu liste dönen bir işleme Thread.sleep(500) ile bir gecikme ekledim -. Ardından sonucu ekrana basacığınızı düşünün ki bunun içinde tekrar UI thread e geçmeniz yani bir runOnUiThread başlatmanız gerekiyor -Aksi takdirde exception alırsınız -.

Bunun yerine

(RxJava)

Observable.fromCallable(new Callable>() {
            @Override
            public List call() throws Exception {

                Thread.sleep(500);

                List strings = new ArrayList<>();
                strings.add("Hello");

                return strings;
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer>() {
            @Override
            public void accept(List strings) throws Exception {
                Toast.makeText(MainActivity.this, strings.toString(), Toast.LENGTH_SHORT).show();
            }
        });


ObserveOn = Subsriber hangi threadde çalışması dinlemesi gerektiğini belirtiyoruz . -AndroidSchedulers.mainThread() yani main threadde dinle . -
SubscribeOn = Observable sonucu yayınlanacak olacak işlemin hangi threadde çalışması gerektiğini belirtiyoruz . -Ben newThread() kullandım ama Schedulers.io da aynı işi yapıyor sadece var olan ama boşa çıkmış threadlerden birini kullanıyor .-

Bunlara ek olarak RxJava ile data üzerinde işlem yapabileceğiniz çok güçlü bir method set (Operators) olduğunu hesaba katarsak . RxJava geleneksel java koduna göre daha baskın geliyor .


Makalenin devamını linkten bulabilirisniz