package reactor.netty.resources;

import com.facebook.common.time.Clock;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import p83.c;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.ie;
import reactor.core.publisher.pa;
import reactor.core.publisher.sf;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.DefaultPooledConnectionProvider;
import reactor.netty.resources.PooledConnectionProvider;
import reactor.netty.transport.TransportConfig;
import reactor.netty.transport.TransportConnector;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes10.dex */
public final class DefaultPooledConnectionProvider extends PooledConnectionProvider<PooledConnection> {
    static final r83.a log = r83.b.a(DefaultPooledConnectionProvider.class);
    static final AttributeKey<ConnectionObserver> OWNER = AttributeKey.valueOf("connectionOwner");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes10.dex */
    public static final class DisposableAcquire implements ConnectionObserver, Runnable, p83.b<PooledRef<PooledConnection>>, p83.c {
        final c.a cancellations;
        final s83.h currentContext;
        final ConnectionObserver obs;
        final ChannelOperations.OnSetup opsFactory;
        final long pendingAcquireTimeout;
        final InstrumentedPool<PooledConnection> pool;
        PooledRef<PooledConnection> pooledRef;
        final boolean retried;
        final ie<Connection> sink;
        Subscription subscription;

        DisposableAcquire(ConnectionObserver connectionObserver, ChannelOperations.OnSetup onSetup, long j14, InstrumentedPool<PooledConnection> instrumentedPool, ie<Connection> ieVar, s83.h hVar) {
            this.cancellations = p83.d.a();
            this.currentContext = hVar;
            this.obs = connectionObserver;
            this.opsFactory = onSetup;
            this.pendingAcquireTimeout = j14;
            this.pool = instrumentedPool;
            this.retried = false;
            this.sink = ieVar;
        }

        DisposableAcquire(DisposableAcquire disposableAcquire) {
            this.cancellations = disposableAcquire.cancellations;
            this.currentContext = disposableAcquire.currentContext;
            this.obs = disposableAcquire.obs;
            this.opsFactory = disposableAcquire.opsFactory;
            this.pendingAcquireTimeout = disposableAcquire.pendingAcquireTimeout;
            this.pool = disposableAcquire.pool;
            this.retried = true;
            this.sink = disposableAcquire.sink;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static /* synthetic */ void lambda$registerClose$2(Channel channel, InstrumentedPool instrumentedPool) {
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.logPoolState(channel, instrumentedPool, "Channel closed");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static /* synthetic */ void lambda$registerClose$3(final Channel channel, final InstrumentedPool instrumentedPool, Future future) throws Exception {
            ConnectionObserver connectionObserver = (ConnectionObserver) channel.attr(DefaultPooledConnectionProvider.OWNER).get();
            if (connectionObserver instanceof DisposableAcquire) {
                ((DisposableAcquire) connectionObserver).pooledRef.invalidate().subscribe(null, null, new Runnable() { // from class: reactor.netty.resources.g
                    @Override // java.lang.Runnable
                    public final void run() {
                        DefaultPooledConnectionProvider.DisposableAcquire.lambda$registerClose$2(Channel.this, instrumentedPool);
                    }
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public /* synthetic */ void lambda$run$0(Channel channel) {
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.logPoolState(channel, this.pool, "Channel closed");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static /* synthetic */ s83.h lambda$run$1(Channel channel, s83.h hVar) {
            return hVar.put("callereventloop", channel.eventLoop());
        }

        @Override // reactor.netty.ConnectionObserver
        public s83.h currentContext() {
            return this.currentContext;
        }

        @Override // p83.c
        public void dispose() {
            this.subscription.cancel();
        }

        @Override // p83.c
        public /* bridge */ /* synthetic */ boolean isDisposed() {
            return super.isDisposed();
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th3) {
            this.sink.error(th3);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(PooledRef<PooledConnection> pooledRef) {
            this.pooledRef = pooledRef;
            PooledConnection poolable = pooledRef.poolable();
            poolable.pooledRef = this.pooledRef;
            Channel channel = poolable.channel;
            if (!this.currentContext.isEmpty()) {
                ReactorNetty.setChannelContext(channel, this.currentContext);
            }
            if (channel.eventLoop().inEventLoop()) {
                run();
            } else {
                channel.eventLoop().execute(this);
            }
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            if (state == ConnectionObserver.State.CONFIGURED) {
                this.sink.a(connection);
            }
            this.obs.onStateChange(connection, state);
        }

        @Override // p83.b, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (sf.q0(this.subscription, subscription)) {
                this.subscription = subscription;
                this.cancellations.G(this);
                if (!this.retried) {
                    this.sink.d(this.cancellations);
                }
                subscription.request(Clock.MAX_TIME);
            }
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th3) {
            this.sink.error(th3);
            this.obs.onUncaughtException(connection, th3);
        }

        void registerClose(PooledRef<PooledConnection> pooledRef, final InstrumentedPool<PooledConnection> instrumentedPool) {
            final Channel channel = pooledRef.poolable().channel;
            r83.a aVar = DefaultPooledConnectionProvider.log;
            if (aVar.isDebugEnabled()) {
                aVar.debug(ReactorNetty.format(channel, "Registering pool release on close event for channel"));
            }
            channel.closeFuture().addListener(new GenericFutureListener() { // from class: reactor.netty.resources.h
                @Override // io.netty.util.concurrent.GenericFutureListener
                public final void operationComplete(Future future) {
                    DefaultPooledConnectionProvider.DisposableAcquire.lambda$registerClose$3(Channel.this, instrumentedPool, future);
                }
            });
        }

        @Override // java.lang.Runnable
        public void run() {
            PooledConnection poolable = this.pooledRef.poolable();
            final Channel channel = poolable.channel;
            if (!channel.isActive()) {
                this.pooledRef.invalidate().subscribe(null, null, new Runnable() { // from class: reactor.netty.resources.e
                    @Override // java.lang.Runnable
                    public final void run() {
                        DefaultPooledConnectionProvider.DisposableAcquire.this.lambda$run$0(channel);
                    }
                });
                if (!this.retried) {
                    r83.a aVar = DefaultPooledConnectionProvider.log;
                    if (aVar.isDebugEnabled()) {
                        aVar.debug(ReactorNetty.format(channel, "Immediately aborted pooled channel, re-acquiring new channel"));
                    }
                    this.pool.acquire(Duration.ofMillis(this.pendingAcquireTimeout)).contextWrite(new Function() { // from class: reactor.netty.resources.f
                        @Override // java.util.function.Function
                        public final Object apply(Object obj) {
                            s83.h lambda$run$1;
                            lambda$run$1 = DefaultPooledConnectionProvider.DisposableAcquire.lambda$run$1(Channel.this, (s83.h) obj);
                            return lambda$run$1;
                        }
                    }).subscribe((p83.b<? super PooledRef<PooledConnection>>) new DisposableAcquire(this));
                    return;
                }
                this.sink.error(new IOException("Error while acquiring from " + this.pool));
                return;
            }
            ConnectionObserver connectionObserver = (ConnectionObserver) channel.attr(DefaultPooledConnectionProvider.OWNER).getAndSet(this);
            if (connectionObserver instanceof PendingConnectionObserver) {
                PendingConnectionObserver pendingConnectionObserver = (PendingConnectionObserver) connectionObserver;
                registerClose(this.pooledRef, this.pool);
                while (true) {
                    PendingConnectionObserver.Pending poll = pendingConnectionObserver.pendingQueue.poll();
                    if (poll == null) {
                        break;
                    }
                    Throwable th3 = poll.error;
                    if (th3 != null) {
                        onUncaughtException(poll.connection, th3);
                    } else {
                        ConnectionObserver.State state = poll.state;
                        if (state != null) {
                            onStateChange(poll.connection, state);
                        }
                    }
                }
                connectionObserver = null;
            } else if (connectionObserver == null) {
                registerClose(this.pooledRef, this.pool);
            }
            if (connectionObserver == null) {
                if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                    PooledConnectionProvider.logPoolState(channel, this.pool, "Channel connected");
                }
                if (this.opsFactory == ChannelOperations.OnSetup.empty()) {
                    this.sink.a(Connection.from(channel));
                    return;
                }
                return;
            }
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.logPoolState(channel, this.pool, "Channel acquired");
            }
            this.obs.onStateChange(poolable, ConnectionObserver.State.ACQUIRED);
            ChannelOperations<?, ?> create = this.opsFactory.create(poolable, poolable, null);
            if (create == null) {
                this.sink.a(poolable);
                return;
            }
            if (channel.pipeline().get(NettyPipeline.H2MultiplexHandler) != null) {
                this.sink.a(create);
                this.obs.onStateChange(poolable, ConnectionObserver.State.CONFIGURED);
            } else {
                create.bind();
                this.sink.a(create);
                this.obs.onStateChange(create, ConnectionObserver.State.CONFIGURED);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes10.dex */
    public static final class PendingConnectionObserver implements ConnectionObserver {
        final s83.h context;
        final Queue<Pending> pendingQueue;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: classes10.dex */
        public static class Pending {
            final Connection connection;
            final Throwable error;
            final ConnectionObserver.State state;

            Pending(Connection connection, Throwable th3, ConnectionObserver.State state) {
                this.connection = connection;
                this.error = th3;
                this.state = state;
            }
        }

        public PendingConnectionObserver() {
            this(s83.h.empty());
        }

        public PendingConnectionObserver(s83.h hVar) {
            this.pendingQueue = (Queue) reactor.util.concurrent.k.z(4).get();
            this.context = hVar;
        }

        @Override // reactor.netty.ConnectionObserver
        public s83.h currentContext() {
            return this.context;
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(Connection connection, ConnectionObserver.State state) {
            this.pendingQueue.add(new Pending(connection, null, state));
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th3) {
            this.pendingQueue.add(new Pending(connection, th3, null));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes10.dex */
    public static final class PooledConnection extends AtomicLong implements Connection, ConnectionObserver {
        final Channel channel;
        final Sinks.c<Void> onTerminate = Sinks.c().empty();
        final InstrumentedPool<PooledConnection> pool;
        PooledRef<PooledConnection> pooledRef;

        PooledConnection(Channel channel, InstrumentedPool<PooledConnection> instrumentedPool) {
            this.channel = channel;
            this.pool = instrumentedPool;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public /* synthetic */ void lambda$onStateChange$0(ConnectionObserver connectionObserver, Connection connection, Throwable th3) {
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.logPoolState(this.pooledRef.poolable().channel, this.pool, "Failed cleaning the channel from pool", th3);
            }
            this.onTerminate.B();
            connectionObserver.onStateChange(connection, ConnectionObserver.State.RELEASED);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public /* synthetic */ void lambda$onStateChange$1(ConnectionObserver connectionObserver, Connection connection) {
            if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                PooledConnectionProvider.logPoolState(this.pooledRef.poolable().channel, this.pool, "Channel cleaned");
            }
            this.onTerminate.B();
            connectionObserver.onStateChange(connection, ConnectionObserver.State.RELEASED);
        }

        @Override // reactor.netty.DisposableChannel
        public Channel channel() {
            return this.channel;
        }

        @Override // reactor.netty.ConnectionObserver
        public s83.h currentContext() {
            return owner().currentContext();
        }

        @Override // reactor.netty.ConnectionObserver
        public void onStateChange(final Connection connection, ConnectionObserver.State state) {
            r83.a aVar = DefaultPooledConnectionProvider.log;
            if (aVar.isDebugEnabled()) {
                aVar.debug(ReactorNetty.format(connection.channel(), "onStateChange({}, {})"), connection, state);
            }
            ConnectionObserver.State state2 = ConnectionObserver.State.DISCONNECTING;
            if (state != state2) {
                owner().onStateChange(connection, state);
                return;
            }
            if (!isPersistent() && this.channel.isActive()) {
                this.channel.close();
                owner().onStateChange(connection, state2);
                return;
            }
            if (!this.channel.isActive()) {
                owner().onStateChange(connection, state2);
                return;
            }
            if (aVar.isDebugEnabled()) {
                aVar.debug(ReactorNetty.format(connection.channel(), "Releasing channel"));
            }
            final ConnectionObserver connectionObserver = (ConnectionObserver) this.channel.attr(DefaultPooledConnectionProvider.OWNER).getAndSet(ConnectionObserver.emptyListener());
            if (ReactorNetty.getChannelContext(this.channel) != null) {
                ReactorNetty.setChannelContext(this.channel, null);
            }
            PooledRef<PooledConnection> pooledRef = this.pooledRef;
            if (pooledRef == null) {
                return;
            }
            pooledRef.release().subscribe(null, new Consumer() { // from class: reactor.netty.resources.i
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    DefaultPooledConnectionProvider.PooledConnection.this.lambda$onStateChange$0(connectionObserver, connection, (Throwable) obj);
                }
            }, new Runnable() { // from class: reactor.netty.resources.j
                @Override // java.lang.Runnable
                public final void run() {
                    DefaultPooledConnectionProvider.PooledConnection.this.lambda$onStateChange$1(connectionObserver, connection);
                }
            });
        }

        @Override // reactor.netty.Connection
        public pa<Void> onTerminate() {
            return this.onTerminate.V().or(onDispose());
        }

        @Override // reactor.netty.ConnectionObserver
        public void onUncaughtException(Connection connection, Throwable th3) {
            owner().onUncaughtException(connection, th3);
        }

        ConnectionObserver owner() {
            AttributeKey<ConnectionObserver> attributeKey;
            ConnectionObserver connectionObserver;
            do {
                Channel channel = this.channel;
                attributeKey = DefaultPooledConnectionProvider.OWNER;
                connectionObserver = (ConnectionObserver) channel.attr(attributeKey).get();
                if (connectionObserver != null) {
                    break;
                }
                connectionObserver = new PendingConnectionObserver();
            } while (!this.channel.attr(attributeKey).compareAndSet(null, connectionObserver));
            return connectionObserver;
        }

        @Override // java.util.concurrent.atomic.AtomicLong
        public String toString() {
            return "PooledConnection{channel=" + this.channel + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes10.dex */
    public static final class PooledConnectionAllocator {
        final TransportConfig config;
        final InstrumentedPool<PooledConnection> pool;
        final SocketAddress remoteAddress;
        final AddressResolverGroup<?> resolver;
        static final BiPredicate<PooledConnection, PooledRefMetadata> DEFAULT_EVICTION_PREDICATE = new BiPredicate() { // from class: reactor.netty.resources.k
            @Override // java.util.function.BiPredicate
            public final boolean test(Object obj, Object obj2) {
                boolean lambda$static$1;
                lambda$static$1 = DefaultPooledConnectionProvider.PooledConnectionAllocator.lambda$static$1((DefaultPooledConnectionProvider.PooledConnection) obj, (PooledRefMetadata) obj2);
                return lambda$static$1;
            }
        };
        static final Function<PooledConnection, Publisher<Void>> DEFAULT_DESTROY_HANDLER = new Function() { // from class: reactor.netty.resources.l
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                Publisher lambda$static$2;
                lambda$static$2 = DefaultPooledConnectionProvider.PooledConnectionAllocator.lambda$static$2((DefaultPooledConnectionProvider.PooledConnection) obj);
                return lambda$static$2;
            }
        };

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: classes10.dex */
        public final class PooledConnectionInitializer extends ChannelInitializer<Channel> implements p83.b<Channel> {
            PooledConnection pooledConnection;
            final ie<PooledConnection> sink;

            PooledConnectionInitializer(ie<PooledConnection> ieVar) {
                this.sink = ieVar;
            }

            @Override // p83.b
            public /* bridge */ /* synthetic */ s83.h currentContext() {
                return super.currentContext();
            }

            @Override // io.netty.channel.ChannelInitializer
            protected void initChannel(Channel channel) {
                if (DefaultPooledConnectionProvider.log.isDebugEnabled()) {
                    PooledConnectionProvider.logPoolState(channel, PooledConnectionAllocator.this.pool, "Created a new pooled channel");
                }
                PooledConnection pooledConnection = new PooledConnection(channel, PooledConnectionAllocator.this.pool);
                this.pooledConnection = pooledConnection;
                channel.attr(DefaultPooledConnectionProvider.OWNER).compareAndSet(null, new PendingConnectionObserver(s83.h.d(this.sink.c())));
                channel.pipeline().remove(this);
                ChannelPipeline pipeline = channel.pipeline();
                PooledConnectionAllocator pooledConnectionAllocator = PooledConnectionAllocator.this;
                pipeline.addFirst(pooledConnectionAllocator.config.channelInitializer(pooledConnection, pooledConnectionAllocator.remoteAddress, false));
                pooledConnection.bind();
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th3) {
                this.sink.error(th3);
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(Channel channel) {
                this.sink.a(this.pooledConnection);
            }

            @Override // p83.b, org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                subscription.request(Clock.MAX_TIME);
            }
        }

        PooledConnectionAllocator(TransportConfig transportConfig, PooledConnectionProvider.PoolFactory<PooledConnection> poolFactory, SocketAddress socketAddress, AddressResolverGroup<?> addressResolverGroup) {
            this.config = transportConfig;
            this.remoteAddress = socketAddress;
            this.resolver = addressResolverGroup;
            this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public /* synthetic */ void lambda$connectChannel$0(ie ieVar) {
            PooledConnectionInitializer pooledConnectionInitializer = new PooledConnectionInitializer(ieVar);
            EventLoop eventLoop = ieVar.c().hasKey("callereventloop") ? (EventLoop) ieVar.c().get("callereventloop") : null;
            if (eventLoop != null) {
                TransportConnector.connect(this.config, this.remoteAddress, this.resolver, pooledConnectionInitializer, eventLoop, ieVar.c()).subscribe((p83.b<? super Channel>) pooledConnectionInitializer);
            } else {
                TransportConnector.connect(this.config, this.remoteAddress, this.resolver, pooledConnectionInitializer, ieVar.c()).subscribe((p83.b<? super Channel>) pooledConnectionInitializer);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static /* synthetic */ boolean lambda$static$1(PooledConnection pooledConnection, PooledRefMetadata pooledRefMetadata) {
            return (pooledConnection.channel.isActive() && pooledConnection.isPersistent()) ? false : true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static /* synthetic */ Publisher lambda$static$2(PooledConnection pooledConnection) {
            return !pooledConnection.channel.isActive() ? pa.empty() : FutureMono.from(pooledConnection.channel.close());
        }

        Publisher<PooledConnection> connectChannel() {
            return pa.create(new Consumer() { // from class: reactor.netty.resources.m
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    DefaultPooledConnectionProvider.PooledConnectionAllocator.this.lambda$connectChannel$0((ie) obj);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultPooledConnectionProvider(ConnectionProvider.Builder builder) {
        this(builder, null);
    }

    DefaultPooledConnectionProvider(ConnectionProvider.Builder builder, java.time.Clock clock) {
        super(builder, clock);
    }

    @Override // reactor.netty.resources.PooledConnectionProvider
    protected p83.b<PooledRef<PooledConnection>> createDisposableAcquire(TransportConfig transportConfig, ConnectionObserver connectionObserver, long j14, InstrumentedPool<PooledConnection> instrumentedPool, ie<Connection> ieVar, s83.h hVar) {
        return new DisposableAcquire(connectionObserver, transportConfig.channelOperationsProvider(), j14, instrumentedPool, ieVar, hVar);
    }

    @Override // reactor.netty.resources.PooledConnectionProvider
    protected InstrumentedPool<PooledConnection> createPool(TransportConfig transportConfig, PooledConnectionProvider.PoolFactory<PooledConnection> poolFactory, SocketAddress socketAddress, AddressResolverGroup<?> addressResolverGroup) {
        return new PooledConnectionAllocator(transportConfig, poolFactory, socketAddress, addressResolverGroup).pool;
    }
}
