Advancing RxJava er TestScheduler for en blokkering samtale å utsette ()

stemmer
0

Bruke RxJava2 Jeg har en klasse som bygger bro over reaktiv til den ikke-reaktiv verden. I denne klasse er det en overstyrt metode, som returnerer en verdi etter en potensielt lang behandling . Denne behandling er innleiret i den reaktive verden ved hjelp av Single.create(emitter -> ...).

Behandlingstiden er avgjørende for meg, som det er en forskjell når det gjelder forretningslogikk, om en annen abonnent vises mens behandlingen er i gang eller etter at den er ferdig (eller kansellert).

Nå er jeg i å teste denne klassen - derfor trenger jeg den lange behandlingstiden for å være virtuelle bare . RxJava sin TestSchedulertil unnsetning!

For å opprette en test-gjennomføring for at brodannelse basisklassen, som emulerer forsinkelsen, Jeg trenger en blokkerende (!) Ring på den TestScheduler, hvor den fremrykkende i tid utløses på en egen tråd. (Ellers fremme i tid ville aldri bli utløst.)

Mitt problem er at jeg trenger den utrusende i tide til å bli forsinket minst til 'dyrt beregningen' blokkerer - ellers tid få er endret før forsinkelsen-operatør får aktiv og derfor det venter til en annen fremmarsj som aldri vil skje.

Jeg kan løse dette ved å ringe Thread.sleep(100)før anropet å avansere i tid - men det virker litt stygg til meg ... Hvor lenge å vente? For flere tester, legger til at tiden opp, men sviktende grunn av midlertidige problemer ... ugh.

Noen anelse om hvordan å teste denne situasjonen på en forhåpentligvis mer ren måte? Takk!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class TestSchedulerBlocking {

    public static void main(String[] args) throws Exception {

        TestScheduler testScheduler = new TestScheduler();

        // the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
        Single<String> single = Single.create(emitter -> {

            System.out.println(emitting on  + Thread.currentThread() +  ...);

            // This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
            // (e.g. method returns, or, in another case, an exception is thrown).
            // Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
            // Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
            // Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
            // I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
            // Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
            Completable.complete()
                       .delay(3, TimeUnit.SECONDS, testScheduler)
                       .doOnSubscribe(__ -> System.out.println(onSubcribe: completable))
                       .blockingAwait();

            System.out.println(<< blocking released >>);

            // this has to be delayed! Might be, for example, a return or an exception in real world.
            emitter.onSuccess(FooBar!);

        });


        System.out.println(running the task ...);
        TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println(onSubcribe: single))
                                                  .subscribeOn(Schedulers.newThread())
                                                  .test();

        // Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
        // and therefore the blockingAwait() blocks until infinity...
        System.out.println(---SLEEPING---);
        Thread.sleep(100);

        // virtually advance the blocking delay 
        System.out.println(---advancing time---);
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

        // wait for the task to complete
        System.out.println(await...);
        testObserver.await();

        // assertions on task results, etc.
        System.out.println(finished. now do some testing... values (expected [FooBar!]):  + testObserver.values());

    }

}

[Merk: Jeg stilte dette spørsmålet i en mer kompleks og detaljert måte på i går - men som det var for varmt på kontoret mitt og det hadde flere feil og feil. Derfor har jeg slettet den og nå er det mer kondensert til det virkelige problemet.]

Publisert på 19/09/2018 klokken 13:31
kilden bruker
På andre språk...                            


1 svar

stemmer
0

Hvis jeg forstår problemet ditt riktig, har du en langvarig service, og du vil være i stand til å teste abonnementer til tjenester som vil bli berørt av den langvarige tjeneste. Jeg har nærmet seg dette på følgende måte.

Definere tjenesten som du har, ved hjelp av en TestSchedulerfor å kontrollere sin fremgang i tid. Deretter definerer teststimuli å bruke samme test planleggeren.

Denne emulerer blokkering enhet. Ikke bruk "block".

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
                          .doOnNext( _l -> {} )
                          .map( _l -> "FooBar!" )
                          .take( 1 )
                          .toSingle();

// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                            .subscribeOn(Schedulers.newThread())
                                            .test();

// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

Du skal se at performTestAction1()skjer, så performTestAction2()da singleer ferdig, da performTestAction3(). Alle som er drevet av testen planleggeren.

Svarte 20/09/2018 kl. 15:00
kilden bruker

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more