rxjava-2.2.2.jar
登录
|
io.reactivex.rxjava2:rxjava:2.2.2
io
reactivex
SingleOnSubscribe.java
FlowableOperator.java
package-info.java
CompletableOnSubscribe.java
Notification.java
MaybeEmitter.java
Single.java
internal
SingleTransformer.java
Maybe.java
SingleOperator.java
observables
BackpressureStrategy.java
MaybeOperator.java
subjects
package-info.java
Subject.java
CompletableSubject.java
ReplaySubject.java
MaybeSubject.java
PublishSubject.java
AsyncSubject.java
SerializedSubject.java
SingleSubject.java
UnicastSubject.java
BehaviorSubject.java
ObservableTransformer.java
plugins
SingleSource.java
ObservableConverter.java
MaybeObserver.java
observers
FlowableTransformer.java
disposables
Scheduler.java
ObservableEmitter.java
subscribers
parallel
CompletableSource.java
FlowableOnSubscribe.java
SingleObserver.java
BackpressureOverflowStrategy.java
MaybeOnSubscribe.java
schedulers
flowables
Emitter.java
MaybeConverter.java
SingleEmitter.java
exceptions
FlowableSubscriber.java
MaybeSource.java
Observer.java
SingleConverter.java
functions
processors
CompletableEmitter.java
FlowableConverter.java
CompletableConverter.java
annotations
CompletableTransformer.java
CompletableObserver.java
ObservableOnSubscribe.java
MaybeTransformer.java
ObservableOperator.java
CompletableOperator.java
Flowable.java
Completable.java
Observable.java
FlowableEmitter.java
ObservableSource.java
META-INF
SerializedSubject.java
清空
类结构
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package io.reactivex.subjects; import io.reactivex.
Observer
; import io.reactivex.annotations.
Nullable
; import io.reactivex.disposables.
Disposable
; import io.reactivex.internal.util.*; import io.reactivex.internal.util.
AppendOnlyLinkedArrayList
.
NonThrowingPredicate
; import io.reactivex.plugins.
RxJavaPlugins
; /** * Serializes calls to the Observer methods. * <p>All other Observable and Subject methods are thread-safe by design. * * @param <T> the item value type */ /* public */ final class
SerializedSubject
<T> extends
Subject
<T> implements
NonThrowingPredicate
<
Object
> { /** The actual subscriber to serialize Subscriber calls to. */ final
Subject
<T>
actual
; /** Indicates an emission is going on, guarded by this. */ boolean
emitting
; /** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList
<
Object
>
queue
; /** Indicates a terminal event has been received and all further events will be dropped. */ volatile boolean
done
; /** * Constructor that wraps an actual subject. * @param actual the subject wrapped */
SerializedSubject
(final
Subject
<T>
actual
) { this.
actual
=
actual
; } @
Override
protected void
subscribeActual
(
Observer
<? super T>
observer
) {
actual
.
subscribe
(
observer
); } @
Override
public void
onSubscribe
(
Disposable
d
) { boolean
cancel
; if (!
done
) { synchronized (this) { if (
done
) {
cancel
= true; } else { if (
emitting
) {
AppendOnlyLinkedArrayList
<
Object
>
q
=
queue
; if (
q
== null) {
q
= new
AppendOnlyLinkedArrayList
<
Object
>(4);
queue
=
q
; }
q
.
add
(
NotificationLite
.
disposable
(
d
)); return; }
emitting
= true;
cancel
= false; } } } else {
cancel
= true; } if (
cancel
) {
d
.
dispose
(); } else {
actual
.
onSubscribe
(
d
);
emitLoop
(); } } @
Override
public void
onNext
(T
t
) { if (
done
) { return; } synchronized (this) { if (
done
) { return; } if (
emitting
) {
AppendOnlyLinkedArrayList
<
Object
>
q
=
queue
; if (
q
== null) {
q
= new
AppendOnlyLinkedArrayList
<
Object
>(4);
queue
=
q
; }
q
.
add
(
NotificationLite
.
next
(
t
)); return; }
emitting
= true; }
actual
.
onNext
(
t
);
emitLoop
(); } @
Override
public void
onError
(
Throwable
t
) { if (
done
) {
RxJavaPlugins
.
onError
(
t
); return; } boolean
reportError
; synchronized (this) { if (
done
) {
reportError
= true; } else {
done
= true; if (
emitting
) {
AppendOnlyLinkedArrayList
<
Object
>
q
=
queue
; if (
q
== null) {
q
= new
AppendOnlyLinkedArrayList
<
Object
>(4);
queue
=
q
; }
q
.
setFirst
(
NotificationLite
.
error
(
t
)); return; }
reportError
= false;
emitting
= true; } } if (
reportError
) {
RxJavaPlugins
.
onError
(
t
); return; }
actual
.
onError
(
t
); } @
Override
public void
onComplete
() { if (
done
) { return; } synchronized (this) { if (
done
) { return; }
done
= true; if (
emitting
) {
AppendOnlyLinkedArrayList
<
Object
>
q
=
queue
; if (
q
== null) {
q
= new
AppendOnlyLinkedArrayList
<
Object
>(4);
queue
=
q
; }
q
.
add
(
NotificationLite
.
complete
()); return; }
emitting
= true; }
actual
.
onComplete
(); } /** Loops until all notifications in the queue has been processed. */ void
emitLoop
() { for (;;) {
AppendOnlyLinkedArrayList
<
Object
>
q
; synchronized (this) {
q
=
queue
; if (
q
== null) {
emitting
= false; return; }
queue
= null; }
q
.
forEachWhile
(this); } } @
Override
public boolean
test
(
Object
o
) { return
NotificationLite
.
acceptFull
(
o
,
actual
); } @
Override
public boolean
hasObservers
() { return
actual
.
hasObservers
(); } @
Override
public boolean
hasThrowable
() { return
actual
.
hasThrowable
(); } @
Override
@
Nullable
public
Throwable
getThrowable
() { return
actual
.
getThrowable
(); } @
Override
public boolean
hasComplete
() { return
actual
.
hasComplete
(); } }
查找资源
Jre/Lib
输入类名或文件名
类结构窗口