/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.IntBinaryOperator;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.TCP;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.UnicastHeader;
import org.jgroups.protocols.relay.RELAY;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.AgeOutCache;
import org.jgroups.util.AsciiString;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.Buffer;
import org.jgroups.util.ExpiryCache;
import org.jgroups.util.FastArray;
import org.jgroups.util.LongTuple;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.MessageCache;
import org.jgroups.util.SeqnoList;
import org.jgroups.util.TimeScheduler;
import org.jgroups.util.TimeService;
import org.jgroups.util.Tuple;
import org.jgroups.util.Util;

@MBean(description="Reliable unicast layer")
public abstract class ReliableUnicast
extends Protocol
implements AgeOutCache.Handler<Address> {
    protected static final long DEFAULT_FIRST_SEQNO = 1L;
    protected static final long DEFAULT_XMIT_INTERVAL = 500L;
    @Property(description="Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping. Note that this creates lingering connection entries, which increases memory over time.", type=AttributeType.TIME)
    protected long conn_expiry_timeout = 120000L;
    @Property(description="Time (in ms) until a connection marked to be closed will get removed. 0 disables this", type=AttributeType.TIME)
    protected long conn_close_timeout = 240000L;
    protected long max_retransmit_time = 60000L;
    @Property(description="Interval (in milliseconds) at which messages in the send windows are resent", type=AttributeType.TIME)
    protected long xmit_interval = 500L;
    @Property(description="When true, the sender retransmits messages until ack'ed and the receiver asks for missing messages. When false, this is not done, but ack'ing and stale connection testing is still done. https://issues.redhat.com/browse/JGRP-2676", deprecatedMessage="is ignored")
    @Deprecated(since="5.4.4", forRemoval=true)
    protected boolean xmits_enabled = true;
    @Property(description="If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
    protected boolean log_not_found_msgs = true;
    @Property(description="Min time (in ms) to elapse for successive SEND_FIRST_SEQNO messages to be sent to the same sender", type=AttributeType.TIME)
    protected long sync_min_interval = 2000L;
    @Property(description="Max number of messages to ask for in a retransmit request. 0 disables this and uses the max bundle size in the transport")
    protected int max_xmit_req_size;
    @Property(description="The max size of a message batch when delivering messages. 0 is unbounded")
    protected int max_batch_size;
    @Property(description="Increment seqno and send a message atomically. Reduces retransmissions. Description in doc/design/NAKACK4.txt ('misc')")
    protected boolean send_atomically;
    @Property(description="Reuses the same message batch for delivery of regular messages (only done by a single thread anyway). Not advisable for buffers that can grow infinitely (NAKACK3)")
    protected boolean reuse_message_batches = true;
    @Property(description="If true, a unicast message to self is looped back up on the same thread. Note that this may cause problems (e.g. deadlocks) in some applications, so make sure that your code can handle this. Issue: https://issues.redhat.com/browse/JGRP-2547")
    protected boolean loopback;
    protected static final int DEFAULT_INITIAL_CAPACITY = 128;
    protected static final int DEFAULT_INCREMENT = 512;
    @ManagedAttribute(description="Number of message sent", type=AttributeType.SCALAR)
    protected final LongAdder num_msgs_sent = new LongAdder();
    @ManagedAttribute(description="Number of message received", type=AttributeType.SCALAR)
    protected final LongAdder num_msgs_received = new LongAdder();
    @ManagedAttribute(description="Number of acks sent", type=AttributeType.SCALAR)
    protected final LongAdder num_acks_sent = new LongAdder();
    @ManagedAttribute(description="Number of acks received", type=AttributeType.SCALAR)
    protected final LongAdder num_acks_received = new LongAdder();
    @ManagedAttribute(description="Number of retransmitted messages", type=AttributeType.SCALAR)
    protected final LongAdder num_xmits = new LongAdder();
    @ManagedAttribute(description="Number of retransmit requests received", type=AttributeType.SCALAR)
    protected final LongAdder xmit_reqs_received = new LongAdder();
    @ManagedAttribute(description="Number of retransmit requests sent", type=AttributeType.SCALAR)
    protected final LongAdder xmit_reqs_sent = new LongAdder();
    @ManagedAttribute(description="Number of retransmit responses sent", type=AttributeType.SCALAR)
    protected final LongAdder xmit_rsps_sent = new LongAdder();
    @ManagedAttribute(description="Average batch size of messages delivered to the application")
    protected final AverageMinMax avg_delivery_batch_size = new AverageMinMax(1024);
    @ManagedAttribute(description="True if sending a message can block at the transport level")
    protected boolean sends_can_block = true;
    @ManagedAttribute(description="tracing is enabled or disabled for the given log", writable=true)
    protected boolean is_trace = this.log.isTraceEnabled();
    @ManagedAttribute(description="Whether or not a RELAY protocol was found below in the stack")
    protected boolean relay_present;
    protected final Map<Address, SenderEntry> send_table = Util.createConcurrentMap();
    protected final Map<Address, ReceiverEntry> recv_table = Util.createConcurrentMap();
    protected final Map<Address, MessageBatch> cached_batches = Util.createConcurrentMap();
    protected final ReentrantLock recv_table_lock = new ReentrantLock();
    protected final Map<Address, Long> xmit_task_map = new ConcurrentHashMap<Address, Long>();
    protected Future<?> xmit_task;
    protected volatile List<Address> members = new ArrayList<Address>(11);
    protected TimeScheduler timer;
    protected volatile boolean running;
    protected short last_conn_id;
    protected AgeOutCache<Address> cache;
    protected TimeService time_service;
    protected final AtomicInteger timestamper = new AtomicInteger(0);
    protected ExpiryCache<Address> last_sync_sent;
    @ManagedAttribute(description="Number of unicast messages to self looped back up", type=AttributeType.SCALAR)
    protected final LongAdder num_loopbacks = new LongAdder();
    protected final MessageCache msg_cache = new MessageCache();
    protected static final Message DUMMY_OOB_MSG = new EmptyMessage().setFlag(Message.Flag.OOB);
    protected final Predicate<Message> drop_oob_and_dont_loopback_msgs_filter = msg -> !(msg == null || msg == DUMMY_OOB_MSG || msg.isFlagSet(Message.Flag.OOB) && !msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED) || msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) && Objects.equals(this.local_addr, msg.getSrc()));
    protected static final Predicate<Message> remove_filter = m -> m != null && (m.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) || m == DUMMY_OOB_MSG || m.isFlagSet(Message.TransientFlag.OOB_DELIVERED));
    protected static final BiConsumer<MessageBatch, Message> BATCH_ACCUMULATOR = MessageBatch::add;

    protected abstract Buffer<Message> createBuffer(long var1);

    protected abstract boolean needToSendAck(Entry var1, int var2);

    public long getNumLoopbacks() {
        return this.num_loopbacks.sum();
    }

    @ManagedAttribute(description="Returns the number of outgoing (send) connections", type=AttributeType.SCALAR)
    public int getNumSendConnections() {
        return this.send_table.size();
    }

    @ManagedAttribute(description="Returns the number of incoming (receive) connections", type=AttributeType.SCALAR)
    public int getNumReceiveConnections() {
        return this.recv_table.size();
    }

    @ManagedAttribute(description="Returns the total number of outgoing (send) and incoming (receive) connections", type=AttributeType.SCALAR)
    public int getNumConnections() {
        return this.getNumReceiveConnections() + this.getNumSendConnections();
    }

    @ManagedAttribute(description="Next seqno issued by the timestamper", type=AttributeType.SCALAR)
    public int getTimestamper() {
        return this.timestamper.get();
    }

    @Override
    @Property(name="level", description="Sets the level")
    public <T extends Protocol> T setLevel(String level) {
        Object retval = super.setLevel(level);
        this.is_trace = this.log.isTraceEnabled();
        return retval;
    }

    public long getXmitInterval() {
        return this.xmit_interval;
    }

    public ReliableUnicast setXmitInterval(long i) {
        this.xmit_interval = i;
        return this;
    }

    public static boolean isXmitsEnabled() {
        return true;
    }

    public ReliableUnicast setXmitsEnabled(boolean b) {
        return this;
    }

    public long getConnExpiryTimeout() {
        return this.conn_expiry_timeout;
    }

    public ReliableUnicast setConnExpiryTimeout(long c) {
        this.conn_expiry_timeout = c;
        return this;
    }

    public long getConnCloseTimeout() {
        return this.conn_close_timeout;
    }

    public ReliableUnicast setConnCloseTimeout(long c) {
        this.conn_close_timeout = c;
        return this;
    }

    public boolean logNotFoundMsgs() {
        return this.log_not_found_msgs;
    }

    public ReliableUnicast logNotFoundMsgs(boolean l) {
        this.log_not_found_msgs = l;
        return this;
    }

    public long getSyncMinInterval() {
        return this.sync_min_interval;
    }

    public ReliableUnicast setSyncMinInterval(long s) {
        this.sync_min_interval = s;
        return this;
    }

    public int getMaxXmitReqSize() {
        return this.max_xmit_req_size;
    }

    public ReliableUnicast setMaxXmitReqSize(int m) {
        this.max_xmit_req_size = m;
        return this;
    }

    public boolean reuseMessageBatches() {
        return this.reuse_message_batches;
    }

    public ReliableUnicast reuseMessageBatches(boolean b) {
        this.reuse_message_batches = b;
        return this;
    }

    public boolean sendsCanBlock() {
        return this.sends_can_block;
    }

    public ReliableUnicast sendsCanBlock(boolean s) {
        this.sends_can_block = s;
        return this;
    }

    public boolean sendAtomically() {
        return this.send_atomically;
    }

    public ReliableUnicast sendAtomically(boolean f) {
        this.send_atomically = f;
        return this;
    }

    public boolean loopback() {
        return this.loopback;
    }

    public ReliableUnicast loopback(boolean b) {
        this.loopback = b;
        return this;
    }

    public ReliableUnicast timeService(TimeService ts) {
        this.time_service = ts;
        return this;
    }

    public ReliableUnicast lastSync(ExpiryCache<Address> c) {
        this.last_sync_sent = c;
        return this;
    }

    @ManagedOperation
    public String printConnections() {
        StringBuilder sb = new StringBuilder();
        if (!this.send_table.isEmpty()) {
            sb.append("\nsend connections:\n");
            for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
            }
        }
        if (!this.recv_table.isEmpty()) {
            sb.append("\nreceive connections:\n");
            for (Map.Entry<Address, Entry> entry : this.recv_table.entrySet()) {
                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
            }
        }
        return sb.toString();
    }

    @ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
    public String printCachedBatches() {
        return "\n" + this.cached_batches.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n"));
    }

    @ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
    public ReliableUnicast clearCachedBatches() {
        this.cached_batches.clear();
        return this;
    }

    @ManagedOperation(description="Adjust the capacity of cached batches")
    public ReliableUnicast trimCachedBatches() {
        this.cached_batches.values().forEach(mb -> mb.array().trimTo(128));
        return this;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    @Deprecated
    public long getNumMessagesSent() {
        return this.num_msgs_sent.sum();
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    @Deprecated
    public long getNumMessagesReceived() {
        return this.num_msgs_received.sum();
    }

    public long getNumAcksSent() {
        return this.num_acks_sent.sum();
    }

    public long getNumAcksReceived() {
        return this.num_acks_received.sum();
    }

    public long getNumXmits() {
        return this.num_xmits.sum();
    }

    public long getMaxRetransmitTime() {
        return this.max_retransmit_time;
    }

    @Property(description="Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this", type=AttributeType.TIME)
    public ReliableUnicast setMaxRetransmitTime(long max_retransmit_time) {
        this.max_retransmit_time = max_retransmit_time;
        if (this.cache != null && max_retransmit_time > 0L) {
            this.cache.setTimeout(max_retransmit_time);
        }
        return this;
    }

    @ManagedAttribute(description="Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return this.xmit_task != null && !this.xmit_task.isDone();
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    public int getAgeOutCacheSize() {
        return this.cache != null ? this.cache.size() : 0;
    }

    @ManagedOperation
    public String printAgeOutCache() {
        return this.cache != null ? this.cache.toString() : "n/a";
    }

    public AgeOutCache<Address> getAgeOutCache() {
        return this.cache;
    }

    public boolean hasSendConnectionTo(Address dest) {
        Entry entry = this.send_table.get(dest);
        return entry != null && entry.state() == State.OPEN;
    }

    @ManagedAttribute(type=AttributeType.SCALAR)
    public int getNumUnackedMessages() {
        return ReliableUnicast.accumulate(Buffer::size, this.send_table.values());
    }

    public int getNumUnackedMessages(Address dest) {
        Entry entry = this.send_table.get(dest);
        return entry != null ? entry.buf.size() : 0;
    }

    @ManagedAttribute(description="Total number of undelivered messages in all receive windows", type=AttributeType.SCALAR)
    public int getXmitTableUndeliveredMessages() {
        return ReliableUnicast.accumulate(Buffer::size, this.recv_table.values());
    }

    @ManagedAttribute(description="Total number of missing messages in all receive windows", type=AttributeType.SCALAR)
    public int getXmitTableMissingMessages() {
        return ReliableUnicast.accumulate(Buffer::numMissing, this.recv_table.values());
    }

    @ManagedAttribute(description="Total number of deliverable messages in all receive windows", type=AttributeType.SCALAR)
    public int getXmitTableDeliverableMessages() {
        return ReliableUnicast.accumulate(Buffer::getNumDeliverable, this.recv_table.values());
    }

    @ManagedOperation(description="Prints the contents of the receive windows for all members")
    public String printReceiveWindowMessages() {
        StringBuilder ret = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            Address addr = entry.getKey();
            Buffer buf = entry.getValue().buf;
            ret.append(addr).append(": ").append(buf).append('\n');
        }
        return ret.toString();
    }

    @ManagedOperation(description="Prints the contents of the send windows for all members")
    public String printSendWindowMessages() {
        StringBuilder ret = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            Address addr = entry.getKey();
            Buffer buf = entry.getValue().buf;
            ret.append(addr).append(": ").append(buf).append('\n');
        }
        return ret.toString();
    }

    @Override
    public void resetStats() {
        this.avg_delivery_batch_size.clear();
        Stream.of(this.num_msgs_sent, this.num_msgs_received, this.num_acks_sent, this.num_acks_received, this.num_xmits, this.xmit_reqs_received, this.xmit_reqs_sent, this.xmit_rsps_sent, this.num_loopbacks).forEach(LongAdder::reset);
        this.send_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats);
        this.recv_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats);
    }

    @Override
    public void init() throws Exception {
        super.init();
        TP transport = this.getTransport();
        this.sends_can_block = transport instanceof TCP;
        this.time_service = transport.getTimeService();
        if (this.time_service == null) {
            throw new IllegalStateException("time service from transport is null");
        }
        this.last_sync_sent = new ExpiryCache(this.sync_min_interval);
        int estimated_max_msgs_in_xmit_req = (transport.getBundler().getMaxSize() - 50) * 8;
        int old_max_xmit_size = this.max_xmit_req_size;
        this.max_xmit_req_size = this.max_xmit_req_size <= 0 ? estimated_max_msgs_in_xmit_req : Math.min(this.max_xmit_req_size, estimated_max_msgs_in_xmit_req);
        if (old_max_xmit_size != this.max_xmit_req_size) {
            this.log.trace("%s: set max_xmit_req_size from %d to %d", this.local_addr, old_max_xmit_size, this.max_xmit_req_size);
        }
        if (this.xmit_interval <= 0L) {
            this.log.warn("%s: xmit_interval (%d) has to be > 0; setting it to the default of %d", this.local_addr, this.xmit_interval, 500L);
            this.xmit_interval = 500L;
        }
        this.relay_present = ProtocolStack.findProtocol(this.down_prot, true, RELAY.class) != null;
    }

    @Override
    public void start() throws Exception {
        this.msg_cache.clear();
        this.timer = this.getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        if (this.max_retransmit_time > 0L) {
            this.cache = new AgeOutCache(this.timer, this.max_retransmit_time, this);
        }
        this.running = true;
        this.startRetransmitTask();
    }

    @Override
    public void stop() {
        this.sendPendingAcks();
        this.running = false;
        this.stopRetransmitTask();
        this.xmit_task_map.clear();
        this.removeAllConnections();
        this.msg_cache.clear();
    }

    protected void handleUpEvent(Address sender, Message msg, UnicastHeader hdr) {
        try {
            switch (hdr.type) {
                case 0: {
                    throw new IllegalStateException("header of type DATA is not supposed to be handled by this method");
                }
                case 1: {
                    this.handleAckReceived(sender, hdr.seqno, hdr.conn_id, hdr.timestamp());
                    break;
                }
                case 2: {
                    this.handleResendingOfFirstMessage(sender, hdr.timestamp());
                    break;
                }
                case 3: {
                    this.handleXmitRequest(sender, (SeqnoList)msg.getObject());
                    break;
                }
                case 4: {
                    this.log.trace("%s <-- %s: CLOSE(conn-id=%s)", this.local_addr, sender, hdr.conn_id);
                    ReceiverEntry entry = this.recv_table.get(sender);
                    if (entry != null && entry.connId() == hdr.conn_id) {
                        this.recv_table.remove(sender, entry);
                        this.log.trace("%s: removed receive connection for %s", this.local_addr, sender);
                    }
                    break;
                }
                default: {
                    this.log.error(Util.getMessage("TypeNotKnown"), this.local_addr, hdr.type);
                    break;
                }
            }
        }
        catch (Throwable t) {
            this.log.error(Util.getMessage("FailedHandlingEvent"), this.local_addr, t);
        }
    }

    @Override
    public Object up(Message msg) {
        Address dest = msg.dest();
        Address sender = msg.src();
        if (dest == null || dest.isMulticast() || msg.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.up_prot.up(msg);
        }
        UnicastHeader hdr = (UnicastHeader)msg.getHeader(this.id);
        if (hdr == null) {
            return this.up_prot.up(msg);
        }
        switch (hdr.type) {
            case 0: {
                if (this.is_trace) {
                    this.log.trace("%s <-- %s: DATA(#%d, conn_id=%d%s)", this.local_addr, sender, hdr.seqno, hdr.conn_id, hdr.first ? ", first" : "");
                }
                if (Objects.equals(this.local_addr, sender)) {
                    this.handleDataReceivedFromSelf(sender, hdr.seqno, msg);
                    break;
                }
                this.handleDataReceived(sender, hdr.seqno, hdr.conn_id, hdr.first, msg);
                break;
            }
            default: {
                this.handleUpEvent(sender, msg, hdr);
            }
        }
        return null;
    }

    @Override
    public void up(MessageBatch batch) {
        Collection<Message> queued_msgs;
        ReceiverEntry entry;
        if (batch.dest() == null || batch.dest().isMulticast()) {
            this.up_prot.up(batch);
            return;
        }
        Address sender = batch.sender();
        if (this.local_addr == null || this.local_addr.equals(sender)) {
            Entry entry2;
            Entry entry3 = entry2 = this.local_addr != null ? (Entry)this.send_table.get(this.local_addr) : null;
            if (entry2 != null) {
                this.handleBatchFromSelf(batch, entry2);
            }
            return;
        }
        int size = batch.size();
        short highest_conn_id = 0;
        long lowest_seqno = -1L;
        boolean first_seqno = false;
        ConcurrentHashMap<Short, List> msgs = new ConcurrentHashMap<Short, List>();
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            Object hdr;
            Message msg = it.next();
            if (msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr = (UnicastHeader)msg.getHeader(this.id)) == null) continue;
            it.remove();
            if (((UnicastHeader)hdr).type != 0) {
                this.handleUpEvent(msg.getSrc(), msg, (UnicastHeader)hdr);
                continue;
            }
            highest_conn_id = ReliableUnicast.max(((UnicastHeader)hdr).conn_id, highest_conn_id);
            if (lowest_seqno < 0L) {
                lowest_seqno = ((UnicastHeader)hdr).seqno;
                first_seqno = ((UnicastHeader)hdr).first;
            } else if (lowest_seqno > ((UnicastHeader)hdr).seqno) {
                lowest_seqno = ((UnicastHeader)hdr).seqno;
                first_seqno = ((UnicastHeader)hdr).first;
            }
            List list = msgs.computeIfAbsent(((UnicastHeader)hdr).conn_id, k -> new FastArray(size));
            list.add(new LongTuple<Message>(((UnicastHeader)hdr).seqno(), msg));
        }
        if (msgs.isEmpty()) {
            this.up_prot.up(batch);
            return;
        }
        List list = (List)msgs.get(highest_conn_id);
        if (msgs.size() > 1) {
            msgs.keySet().retainAll(List.of(Short.valueOf(highest_conn_id)));
            Tuple<Long, Boolean> tuple = ReliableUnicast.getLowestSeqno(this.id, list);
            lowest_seqno = tuple.getVal1();
            first_seqno = tuple.getVal2();
        }
        if ((entry = this.getReceiverEntry(sender, lowest_seqno, first_seqno, highest_conn_id, batch.dest())) == null) {
            if (!list.isEmpty()) {
                for (LongTuple tuple : list) {
                    this.msg_cache.add(sender, (Message)tuple.getVal2());
                    this.log.trace("%s: cached %s#%d", this.local_addr, sender, tuple.getVal1());
                }
            }
            return;
        }
        boolean added_queued_msgs = false;
        if (!this.msg_cache.isEmpty() && (queued_msgs = this.msg_cache.drain(sender)) != null) {
            this.addQueuedMessages(sender, entry, queued_msgs);
            added_queued_msgs = true;
        }
        if (added_queued_msgs || list != null && !list.isEmpty()) {
            this.handleBatchReceived(entry, sender, list, batch.mode() == MessageBatch.Mode.OOB, batch.dest());
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
        ArrayList<LongTuple<Message>> list = new ArrayList<LongTuple<Message>>(batch.size());
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            UnicastHeader hdr;
            Message msg = it.next();
            if (msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr = (UnicastHeader)msg.getHeader(this.id)) == null) continue;
            it.remove();
            if (hdr.type != 0) {
                this.handleUpEvent(msg.getSrc(), msg, hdr);
                continue;
            }
            if (entry.conn_id != hdr.conn_id) {
                it.remove();
                continue;
            }
            list.add(new LongTuple<Message>(hdr.seqno(), msg));
        }
        if (!list.isEmpty()) {
            if (this.is_trace) {
                this.log.trace("%s <-- %s: DATA(%s)", this.local_addr, batch.sender(), this.printMessageList(list));
            }
            int len = list.size();
            Buffer<Message> win = entry.buf;
            this.update(entry, len);
            if (batch.mode() == MessageBatch.Mode.OOB) {
                MessageBatch oob_batch = new MessageBatch(this.local_addr, batch.sender(), batch.clusterName(), batch.multicast(), MessageBatch.Mode.OOB, len);
                for (LongTuple longTuple : list) {
                    long seq = longTuple.getVal1();
                    Message msg = win.get(seq);
                    if (msg == null || !msg.isFlagSet(Message.Flag.OOB) || !msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) continue;
                    oob_batch.add(msg);
                }
                this.deliverBatch(oob_batch, entry, batch.dest());
            }
            this.removeAndDeliver(entry, batch.sender(), batch.clusterName(), batch.capacity());
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 6: {
                View view = (View)evt.getArg();
                List<Address> new_members = view.getMembers();
                HashSet<Address> non_members = new HashSet<Address>(this.send_table.keySet());
                non_members.addAll(this.recv_table.keySet());
                this.members = new_members;
                new_members.forEach(non_members::remove);
                if (this.cache != null) {
                    this.cache.removeAll(new_members);
                }
                if (!non_members.isEmpty()) {
                    this.log.trace("%s: closing connections to non members %s", this.local_addr, non_members);
                    non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
                }
                if (!new_members.isEmpty()) {
                    for (Address mbr : new_members) {
                        Entry e = this.send_table.get(mbr);
                        if (e != null && e.state() == State.CLOSING) {
                            e.state(State.OPEN);
                        }
                        if ((e = (Entry)this.recv_table.get(mbr)) == null || e.state() != State.CLOSING) continue;
                        e.state(State.OPEN);
                    }
                }
                this.xmit_task_map.keySet().retainAll(new_members);
                this.last_sync_sent.removeExpiredElements();
                this.cached_batches.keySet().retainAll(new_members);
            }
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object down(Message msg) {
        boolean dont_loopback_set;
        Address dst = msg.getDest();
        if (dst == null || dst.isMulticast() || msg.isFlagSet(Message.Flag.NO_RELIABILITY)) {
            return this.down_prot.down(msg);
        }
        if (!this.running) {
            this.log.trace("%s: discarded message as start() has not yet been called, message: %s", this.local_addr, msg);
            return null;
        }
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        if (this.isLocalSiteMaster(dst)) {
            dst = this.local_addr;
            msg.dest(dst);
        }
        if (this.loopback && Objects.equals(this.local_addr, dst)) {
            if (msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK)) {
                return null;
            }
            this.num_loopbacks.increment();
            return this.up_prot.up(msg);
        }
        SenderEntry entry = this.getSenderEntry(dst);
        boolean bl = dont_loopback_set = msg.isFlagSet(Message.TransientFlag.DONT_LOOPBACK) && dst.equals(this.local_addr);
        if (this.send(msg, entry, dont_loopback_set)) {
            this.num_msgs_sent.increment();
        } else {
            this.log.trace("%s: dropped message due to closed send buffer, message: %s", this.local_addr, msg);
        }
        return null;
    }

    protected boolean isLocalSiteMaster(Address dest) {
        if (this.relay_present && dest.isSiteMaster()) {
            Object ret = this.down_prot.down(new Event(115, dest));
            return ret != null && (Boolean)ret != false;
        }
        return false;
    }

    protected boolean isLocal(Address addr) {
        if (this.relay_present && addr.isSiteAddress()) {
            Object ret = this.down_prot.down(new Event(116, addr));
            return ret != null && (Boolean)ret != false;
        }
        return true;
    }

    public void closeConnection(Address mbr) {
        this.closeSendConnection(mbr);
        this.closeReceiveConnection(mbr);
    }

    public void closeSendConnection(Address mbr) {
        SenderEntry entry = this.send_table.get(mbr);
        if (entry != null) {
            entry.state(State.CLOSING);
        }
    }

    public void closeReceiveConnection(Address mbr) {
        ReceiverEntry entry = this.recv_table.get(mbr);
        if (entry != null) {
            entry.state(State.CLOSING);
        }
    }

    public void removeSendConnection(Address mbr) {
        SenderEntry entry = this.send_table.remove(mbr);
        if (entry != null) {
            entry.state(State.CLOSED);
            if (this.members.contains(mbr)) {
                this.sendClose(mbr, entry.connId());
            }
        }
    }

    public void removeReceiveConnection(Address mbr) {
        this.sendPendingAcks();
        ReceiverEntry entry = this.recv_table.remove(mbr);
        if (entry != null) {
            entry.state(State.CLOSED);
        }
    }

    @ManagedOperation(description="Trashes all connections to other nodes. This is only used for testing")
    public void removeAllConnections() {
        for (SenderEntry se : this.send_table.values()) {
            se.state(State.CLOSED);
        }
        this.send_table.clear();
        this.recv_table.clear();
    }

    protected void retransmit(SeqnoList missing, Address sender, Address real_dest) {
        Message xmit_msg = new ObjectMessage(sender, missing).setFlag(Message.Flag.OOB, Message.Flag.NO_FC).putHeader(this.id, UnicastHeader.createXmitReqHeader());
        if (!Objects.equals(this.local_addr, real_dest)) {
            xmit_msg.setSrc(real_dest);
        }
        if (this.is_trace) {
            this.log.trace("%s --> %s: XMIT_REQ(%s)", this.local_addr, sender, missing);
        }
        this.down_prot.down(xmit_msg);
        this.xmit_reqs_sent.add(missing.size());
    }

    protected void retransmit(Message msg) {
        if (this.is_trace) {
            UnicastHeader hdr = (UnicastHeader)msg.getHeader(this.id);
            long seqno = hdr != null ? hdr.seqno : -1L;
            this.log.trace("%s --> %s: resending(#%d)", this.local_addr, msg.getDest(), seqno);
        }
        this.resend(msg);
        this.num_xmits.increment();
    }

    @Override
    public void expired(Address key) {
        if (key != null) {
            this.log.debug("%s: removing expired connection to %s", this.local_addr, key);
            this.closeConnection(key);
        }
    }

    protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg) {
        Collection<Message> queued_msgs;
        ReceiverEntry entry = this.getReceiverEntry(sender, seqno, first, conn_id, msg.dest());
        if (entry == null) {
            this.msg_cache.add(sender, msg);
            this.log.trace("%s: cached %s#%d", this.local_addr, sender, seqno);
            return;
        }
        if (!this.msg_cache.isEmpty() && (queued_msgs = this.msg_cache.drain(sender)) != null) {
            this.addQueuedMessages(sender, entry, queued_msgs);
        }
        this.addMessage(entry, sender, seqno, msg);
        this.removeAndDeliver(entry, sender, null, 1);
    }

    protected void addMessage(ReceiverEntry entry, Address sender, long seqno, Message msg) {
        Buffer win = entry.buf();
        this.update(entry, 1);
        boolean oob = msg.isFlagSet(Message.Flag.OOB);
        boolean added = win.add(seqno, oob ? DUMMY_OOB_MSG : msg);
        if (oob && added) {
            this.deliverMessage(msg, sender, seqno);
        }
        if (this.needToSendAck(entry, 1)) {
            this.sendAck(sender, entry, msg.dest());
        }
    }

    protected void addQueuedMessages(Address sender, ReceiverEntry entry, Collection<Message> queued_msgs) {
        for (Message msg : queued_msgs) {
            UnicastHeader hdr = (UnicastHeader)msg.getHeader(this.id);
            if (hdr.conn_id != entry.conn_id) {
                this.log.warn("%s: dropped queued message %s#%d as its conn_id (%d) did not match (entry.conn_id=%d)", this.local_addr, sender, hdr.seqno, hdr.conn_id, entry.conn_id);
                continue;
            }
            this.addMessage(entry, sender, hdr.seqno(), msg);
        }
    }

    protected void handleDataReceivedFromSelf(Address sender, long seqno, Message msg) {
        Entry entry = this.send_table.get(sender);
        if (entry == null || entry.state() == State.CLOSED) {
            this.log.warn("%s: entry not found for %s; dropping message", this.local_addr, sender);
            return;
        }
        this.update(entry, 1);
        Buffer<Message> win = entry.buf;
        if (msg.isFlagSet(Message.Flag.OOB) && (msg = win.get(seqno)) != null && msg.isFlagSet(Message.Flag.OOB) && msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
            this.deliverMessage(msg, sender, seqno);
        }
        this.removeAndDeliver(entry, sender, null, 1);
    }

    protected void handleBatchReceived(ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob, Address original_dest) {
        if (this.is_trace) {
            this.log.trace("%s <-- %s: DATA(%s)", this.local_addr, sender, this.printMessageList(msgs));
        }
        int batch_size = msgs.size();
        Buffer buf = entry.buf;
        boolean added = buf.add(msgs, oob, oob ? DUMMY_OOB_MSG : null);
        this.update(entry, batch_size);
        entry.sendAck();
        if (added && oob) {
            MessageBatch oob_batch = new MessageBatch(this.local_addr, sender, null, false, MessageBatch.Mode.OOB, msgs.size());
            for (LongTuple<Message> tuple : msgs) {
                oob_batch.add(tuple.getVal2());
            }
            this.deliverBatch(oob_batch, entry, original_dest);
        }
        this.removeAndDeliver(entry, sender, null, msgs.size());
    }

    protected void removeAndDeliver(Entry entry, Address sender, AsciiString cluster, int min_size) {
        Buffer<Message> buf = entry.buf();
        AtomicInteger adders = buf.getAdders();
        if (adders.getAndIncrement() != 0) {
            return;
        }
        AsciiString cl = cluster != null ? cluster : this.getTransport().getClusterNameAscii();
        int cap = Math.max(Math.max(Math.max(buf.size(), this.max_batch_size), min_size), 128);
        MessageBatch batch = this.reuse_message_batches && cl != null ? this.cached_batches.computeIfAbsent(sender, __ -> new MessageBatch(cap).dest(this.local_addr).sender(sender).cluster(cl).mcast(true).increment(512)) : new MessageBatch(cap).dest(this.local_addr).sender(sender).cluster(cl).multicast(true).increment(512);
        Supplier<MessageBatch> batch_creator = () -> batch;
        MessageBatch mb = null;
        do {
            try {
                batch.reset();
                mb = buf.removeMany(true, this.max_batch_size, this.drop_oob_and_dont_loopback_msgs_filter, batch_creator, BATCH_ACCUMULATOR);
            }
            catch (Throwable t) {
                this.log.error("%s: failed removing messages from table for %s: %s", this.local_addr, sender, t);
            }
            if (batch.isEmpty()) continue;
            this.deliverBatch(batch, entry, null);
        } while (mb != null || adders.decrementAndGet() != 0);
    }

    protected String printMessageList(List<LongTuple<Message>> list) {
        UnicastHeader hdr;
        Message second;
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        Message first = size > 0 ? list.get(0).getVal2() : null;
        Message message = second = size > 1 ? list.get(size - 1).getVal2() : first;
        if (first != null && (hdr = (UnicastHeader)first.getHeader(this.id)) != null) {
            sb.append("#" + hdr.seqno);
        }
        if (second != null && (hdr = (UnicastHeader)second.getHeader(this.id)) != null) {
            sb.append(" - #" + hdr.seqno);
        }
        return sb.toString();
    }

    protected ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest) {
        ReceiverEntry entry = this.recv_table.get(sender);
        if (entry != null && entry.connId() == conn_id) {
            return entry;
        }
        return this._getReceiverEntry(sender, seqno, first, conn_id, real_dest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReceiverEntry _getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest) {
        this.recv_table_lock.lock();
        try {
            ReceiverEntry entry = this.recv_table.get(sender);
            if (entry == null) {
                if (first) {
                    ReceiverEntry receiverEntry = this.createReceiverEntry(sender, seqno, conn_id, real_dest);
                    return receiverEntry;
                }
                this.recv_table_lock.unlock();
                this.sendRequestForFirstSeqno(sender, real_dest);
                ReceiverEntry receiverEntry = null;
                return receiverEntry;
            }
            ReceiverEntry receiverEntry = this.compareConnIds(conn_id, entry.connId(), first, entry, sender, seqno, real_dest);
            return receiverEntry;
        }
        finally {
            if (this.recv_table_lock.isHeldByCurrentThread()) {
                this.recv_table_lock.unlock();
            }
        }
    }

    protected ReceiverEntry compareConnIds(short other, short mine, boolean first, ReceiverEntry e, Address sender, long seqno, Address real_dest) {
        if (other == mine) {
            return e;
        }
        if (other < mine) {
            return null;
        }
        if (first) {
            this.log.trace("%s: other conn_id (%d) > mine (%d); creating new receiver window", this.local_addr, other, mine);
            this.recv_table.remove(sender);
            return this.createReceiverEntry(sender, seqno, other, real_dest);
        }
        this.log.trace("%s: other conn_id (%d) > mine (%d) (!first); asking for first message", this.local_addr, other, mine);
        this.recv_table_lock.unlock();
        this.sendRequestForFirstSeqno(sender, real_dest);
        return null;
    }

    protected SenderEntry getSenderEntry(Address dst) {
        SenderEntry entry = this.send_table.get(dst);
        if (entry == null || entry.state() == State.CLOSED) {
            if (entry != null) {
                this.send_table.remove(dst, entry);
            }
            entry = this.send_table.computeIfAbsent(dst, k -> new SenderEntry(this.getNewConnectionId()));
            this.log.trace("%s: created sender window for %s (conn-id=%s)", this.local_addr, dst, entry.connId());
            if (this.cache != null && !this.members.contains(dst)) {
                this.cache.add(dst);
            }
        }
        return entry;
    }

    protected ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id, Address dest) {
        ReceiverEntry entry = this.recv_table.computeIfAbsent(sender, k -> new ReceiverEntry(this.createBuffer(seqno - 1L), conn_id, dest));
        this.log.trace("%s: created receiver window for %s at seqno=#%d for conn-id=%d", this.local_addr, sender, seqno, conn_id);
        return entry;
    }

    protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp) {
        Buffer win;
        SenderEntry entry;
        if (this.is_trace) {
            this.log.trace("%s <-- %s: ACK(#%d, conn-id=%d, ts=%d)", this.local_addr, sender, seqno, conn_id, timestamp);
        }
        if ((entry = this.send_table.get(sender)) != null && entry.connId() != conn_id) {
            this.log.trace("%s: my conn_id (%d) != received conn_id (%d); discarding ACK", this.local_addr, entry.connId(), conn_id);
            return;
        }
        Buffer buffer = win = entry != null ? entry.buf : null;
        if (win != null && entry.updateLastTimestamp(timestamp)) {
            win.purge(seqno, true);
            this.num_acks_received.increment();
        }
    }

    protected void handleResendingOfFirstMessage(Address sender, int timestamp) {
        Buffer win;
        this.log.trace("%s <-- %s: SEND_FIRST_SEQNO", this.local_addr, sender);
        SenderEntry entry = this.send_table.get(sender);
        Buffer buffer = win = entry != null ? entry.buf : null;
        if (win == null) {
            this.log.warn(Util.getMessage("SenderNotFound"), this.local_addr, sender);
            return;
        }
        if (!entry.updateLastTimestamp(timestamp)) {
            return;
        }
        Message rsp = (Message)win.get(win.low() + 1L);
        if (rsp != null) {
            Message copy = rsp.copy(true, true);
            UnicastHeader hdr = (UnicastHeader)copy.getHeader(this.id);
            UnicastHeader newhdr = hdr.copy();
            newhdr.first = true;
            copy.putHeader(this.id, newhdr).setFlag(Message.TransientFlag.DONT_BLOCK);
            this.resend(copy);
        }
    }

    protected void handleXmitRequest(Address sender, SeqnoList missing) {
        Buffer win;
        if (this.is_trace) {
            this.log.trace("%s <-- %s: XMIT(#%s)", this.local_addr, sender, missing);
        }
        SenderEntry entry = this.send_table.get(sender);
        this.xmit_reqs_received.add(missing.size());
        Buffer buffer = win = entry != null ? entry.buf : null;
        if (win == null) {
            return;
        }
        for (Long seqno : missing) {
            Message msg = (Message)win.get(seqno);
            if (msg == null) {
                if (!this.log.isWarnEnabled() || !this.log_not_found_msgs || this.local_addr.equals(sender) || seqno <= win.low()) continue;
                this.log.warn(Util.getMessage("MessageNotFound"), this.local_addr, sender, seqno);
                continue;
            }
            msg.setFlag(Message.TransientFlag.DONT_BLOCK);
            this.resend(msg);
            this.xmit_rsps_sent.increment();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set) {
        long seqno;
        Lock lock;
        Buffer buf = entry.buf;
        short send_conn_id = entry.connId();
        Lock lock2 = lock = this.send_atomically ? buf.lock() : null;
        if (lock != null) {
            lock.lock();
        }
        try {
            seqno = entry.seqno.getAndIncrement();
            msg.putHeader(this.id, UnicastHeader.createDataHeader(seqno, send_conn_id, seqno == 1L));
            if (!this.addToSendBuffer(buf, seqno, msg, dont_loopback_set ? remove_filter : null)) {
                boolean bl = false;
                return bl;
            }
            this.down_prot.down(msg);
            if (entry.state() == State.CLOSING) {
                entry.state(State.OPEN);
            }
            if (this.conn_expiry_timeout > 0L) {
                entry.update();
            }
            if (dont_loopback_set) {
                buf.purge(buf.getHighestDeliverable());
            }
        }
        finally {
            if (lock != null) {
                lock.unlock();
            }
        }
        if (this.is_trace) {
            StringBuilder sb = new StringBuilder();
            sb.append(this.local_addr).append(" --> ").append(msg.dest()).append(": DATA(").append("#").append(seqno).append(", conn_id=").append(send_conn_id);
            if (seqno == 1L) {
                sb.append(", first");
            }
            sb.append(')');
            this.log.trace(sb);
        }
        return true;
    }

    protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, Predicate<Message> filter) {
        long sleep = 10L;
        boolean rc = false;
        while (true) {
            try {
                rc = win.add(seq, msg, filter);
            }
            catch (Throwable t) {
                if (!this.running) continue;
                Util.sleep(sleep);
                sleep = Math.min(5000L, sleep * 2L);
                if (this.running) continue;
            }
            break;
        }
        return rc;
    }

    protected void resend(Message msg) {
        this.down_prot.down(msg);
    }

    protected void deliverMessage(Message msg, Address sender, long seqno) {
        if (this.is_trace) {
            this.log.trace("%s: delivering %s#%s", this.local_addr, sender, seqno);
        }
        try {
            this.up_prot.up(msg);
        }
        catch (Throwable t) {
            this.log.warn(Util.getMessage("FailedToDeliverMsg"), this.local_addr, msg.isFlagSet(Message.Flag.OOB) ? "OOB message" : "message", msg, t);
        }
    }

    protected void deliverBatch(MessageBatch batch, Entry entry, Address original_dest) {
        try {
            if (batch.isEmpty()) {
                return;
            }
            if (this.is_trace) {
                Object first = batch.first();
                Object last = batch.last();
                StringBuilder sb = new StringBuilder(this.local_addr + ": delivering");
                if (first != null && last != null) {
                    UnicastHeader hdr1 = (UnicastHeader)first.getHeader(this.id);
                    UnicastHeader hdr2 = (UnicastHeader)last.getHeader(this.id);
                    if (hdr1 != null && hdr2 != null) {
                        sb.append(" #").append(hdr1.seqno).append(" - #").append(hdr2.seqno);
                    }
                }
                sb.append(" (" + batch.size()).append(" messages)");
                this.log.trace(sb);
            }
            if (this.needToSendAck(entry, batch.size())) {
                this.sendAck(batch.sender(), entry, original_dest);
            }
            this.up_prot.up(batch);
            if (this.stats) {
                this.avg_delivery_batch_size.add(batch.size());
            }
        }
        catch (Throwable t) {
            this.log.warn(Util.getMessage("FailedToDeliverMsg"), this.local_addr, "batch", batch, t);
        }
    }

    protected long getTimestamp() {
        return this.time_service.timestamp();
    }

    public void startRetransmitTask() {
        if (this.xmit_task == null || this.xmit_task.isDone()) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS, this.sends_can_block);
        }
    }

    public void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    protected void sendAck(Address dst, Entry entry, Address real_dest) {
        if (!this.running) {
            return;
        }
        long seqno = entry.buf.highestDelivered();
        short conn_id = entry.connId();
        Message ack = new EmptyMessage(dst).setFlag(Message.TransientFlag.DONT_BLOCK).setFlag(Message.Flag.NO_FC).putHeader(this.id, UnicastHeader.createAckHeader(seqno, conn_id, this.timestamper.incrementAndGet()));
        if (real_dest != null && !Objects.equals(this.local_addr, real_dest)) {
            ack.setSrc(real_dest);
        }
        if (this.is_trace) {
            this.log.trace("%s --> %s: ACK(#%d)", this.local_addr, dst, seqno);
        }
        try {
            this.down_prot.down(ack);
            this.num_acks_sent.increment();
        }
        catch (Throwable t) {
            this.log.error(Util.getMessage("FailedSendingAck"), this.local_addr, seqno, dst, t);
        }
    }

    protected synchronized short getNewConnectionId() {
        short retval = this.last_conn_id;
        this.last_conn_id = this.last_conn_id == Short.MAX_VALUE || this.last_conn_id < 0 ? (short)0 : (short)(this.last_conn_id + 1);
        return retval;
    }

    protected void sendRequestForFirstSeqno(Address dest, Address original_dest) {
        if (this.last_sync_sent.addIfAbsentOrExpired(dest)) {
            Message msg = new EmptyMessage(dest).setFlag(Message.Flag.OOB, Message.Flag.NO_FC).setFlag(Message.TransientFlag.DONT_BLOCK).putHeader(this.id, UnicastHeader.createSendFirstSeqnoHeader(this.timestamper.incrementAndGet()));
            if (!Objects.equals(this.local_addr, original_dest)) {
                msg.setSrc(original_dest);
            }
            this.log.trace("%s --> %s: SEND_FIRST_SEQNO", this.local_addr, dest);
            this.down_prot.down(msg);
        }
    }

    public void sendClose(Address dest, short conn_id) {
        Message msg = new EmptyMessage(dest).putHeader(this.id, UnicastHeader.createCloseHeader(conn_id)).setFlag(Message.Flag.NO_FC);
        this.log.trace("%s --> %s: CLOSE(conn-id=%d)", this.local_addr, dest, conn_id);
        this.down_prot.down(msg);
    }

    @ManagedOperation(description="Closes connections that have been idle for more than conn_expiry_timeout ms")
    public void closeIdleConnections() {
        long age;
        Entry val;
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            val = entry.getValue();
            if (val.state() != State.OPEN || (age = val.age()) < this.conn_expiry_timeout) continue;
            this.log.debug("%s: closing expired connection for %s (%d ms old) in send_table", this.local_addr, entry.getKey(), age);
            this.closeSendConnection(entry.getKey());
        }
        for (Map.Entry<Address, Entry> entry : this.recv_table.entrySet()) {
            val = (ReceiverEntry)entry.getValue();
            if (val.state() != State.OPEN || (age = val.age()) < this.conn_expiry_timeout) continue;
            this.log.debug("%s: closing expired connection for %s (%d ms old) in recv_table", this.local_addr, entry.getKey(), age);
            this.closeReceiveConnection(entry.getKey());
        }
    }

    @ManagedOperation(description="Removes connections that have been closed for more than conn_close_timeout ms")
    public int removeExpiredConnections() {
        long age;
        Entry val;
        int num_removed = 0;
        for (Map.Entry<Address, SenderEntry> entry : this.send_table.entrySet()) {
            val = entry.getValue();
            if (val.state() == State.OPEN || (age = val.age()) < this.conn_close_timeout) continue;
            this.log.debug("%s: removing expired connection for %s (%d ms old) from send_table", this.local_addr, entry.getKey(), age);
            this.removeSendConnection(entry.getKey());
            ++num_removed;
        }
        for (Map.Entry<Address, Entry> entry : this.recv_table.entrySet()) {
            val = (ReceiverEntry)entry.getValue();
            if (val.state() == State.OPEN || (age = val.age()) < this.conn_close_timeout) continue;
            this.log.debug("%s: removing expired connection for %s (%d ms old) from recv_table", this.local_addr, entry.getKey(), age);
            this.removeReceiveConnection(entry.getKey());
            ++num_removed;
        }
        return num_removed;
    }

    @ManagedOperation(description="Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED)")
    public int removeConnections(boolean remove_send_connections, boolean remove_receive_connections) {
        Entry val;
        int num_removed = 0;
        if (remove_send_connections) {
            for (Map.Entry<Address, Entry> entry : this.send_table.entrySet()) {
                val = (SenderEntry)entry.getValue();
                if (val.state() == State.OPEN) continue;
                this.log.debug("%s: removing connection for %s (%d ms old, state=%s) from send_table", new Object[]{this.local_addr, entry.getKey(), val.age(), val.state()});
                this.removeSendConnection(entry.getKey());
                ++num_removed;
            }
        }
        if (remove_receive_connections) {
            for (Map.Entry<Address, Entry> entry : this.recv_table.entrySet()) {
                val = (ReceiverEntry)entry.getValue();
                if (val.state() == State.OPEN) continue;
                this.log.debug("%s: removing expired connection for %s (%d ms old, state=%s) from recv_table", new Object[]{this.local_addr, entry.getKey(), val.age(), val.state()});
                this.removeReceiveConnection(entry.getKey());
                ++num_removed;
            }
        }
        return num_removed;
    }

    @ManagedOperation(description="Triggers the retransmission task")
    public void triggerXmit() {
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            SeqnoList missing;
            Buffer win;
            Address target = entry.getKey();
            ReceiverEntry val = entry.getValue();
            Buffer buffer = win = val != null ? val.buf : null;
            if (win != null && val.needToSendAck()) {
                this.sendAck(target, val, val.realDest());
            }
            if (win != null && win.numMissing() > 0 && (missing = win.getMissing(this.max_xmit_req_size)) != null) {
                long highest = missing.getLast();
                Long prev_seqno = this.xmit_task_map.get(target);
                if (prev_seqno == null) {
                    this.xmit_task_map.put(target, highest);
                    continue;
                }
                missing.removeHigherThan(prev_seqno);
                if (highest > prev_seqno) {
                    this.xmit_task_map.put(target, highest);
                }
                if (missing.isEmpty()) continue;
                long highest_deliverable = win.getHighestDeliverable();
                long first = missing.getFirst();
                if (first < highest_deliverable) {
                    missing.removeLowerThan(highest_deliverable + 1L);
                }
                this.retransmit(missing, target, val.real_dest);
                continue;
            }
            if (this.xmit_task_map.isEmpty()) continue;
            this.xmit_task_map.remove(target);
        }
        for (SenderEntry val : this.send_table.values()) {
            long highest_sent;
            Buffer win = val != null ? val.buf : null;
            if (win == null) continue;
            long highest_acked = win.highestDelivered();
            if (highest_acked < (highest_sent = win.high()) && val.watermark[0] == highest_acked && val.watermark[1] == highest_sent) {
                Message highest_sent_msg = (Message)win.get(highest_sent);
                if (highest_sent_msg == null) continue;
                this.retransmit(highest_sent_msg);
                continue;
            }
            val.watermark(highest_acked, highest_sent);
        }
        if (this.conn_expiry_timeout > 0L) {
            this.closeIdleConnections();
        }
        if (this.conn_close_timeout > 0L) {
            this.removeExpiredConnections();
        }
    }

    @ManagedOperation(description="Sends ACKs immediately for entries which are marked as pending (ACK hasn't been sent yet)")
    public void sendPendingAcks() {
        for (Map.Entry<Address, ReceiverEntry> entry : this.recv_table.entrySet()) {
            Address target = entry.getKey();
            ReceiverEntry val = entry.getValue();
            Buffer win = val != null ? val.buf : null;
            if (win == null || !val.needToSendAck()) continue;
            this.sendAck(target, val, val.realDest());
        }
    }

    protected void update(Entry entry, int num_received) {
        if (this.conn_expiry_timeout > 0L) {
            entry.update();
        }
        if (entry.state() == State.CLOSING) {
            entry.state(State.OPEN);
        }
        this.num_msgs_received.add(num_received);
    }

    protected static int compare(int ts1, int ts2) {
        int diff = ts1 - ts2;
        return Integer.compare(diff, 0);
    }

    @SafeVarargs
    protected static int accumulate(ToIntFunction<Buffer<Message>> func, Collection<? extends Entry> ... entries) {
        return Stream.of(entries).flatMap(Collection::stream).map(entry -> entry.buf).filter(Objects::nonNull).mapToInt(func).sum();
    }

    protected static short max(short a, short b) {
        return a >= b ? a : b;
    }

    protected static Tuple<Long, Boolean> getLowestSeqno(short prot_id, List<LongTuple<Message>> list) {
        long lowest_seqno = -1L;
        boolean first = false;
        for (LongTuple<Message> tuple : list) {
            Message msg = tuple.getVal2();
            UnicastHeader hdr = (UnicastHeader)msg.getHeader(prot_id);
            if (lowest_seqno < 0L) {
                lowest_seqno = hdr.seqno;
                first = hdr.first;
                continue;
            }
            if (lowest_seqno <= hdr.seqno) continue;
            lowest_seqno = hdr.seqno;
            first = hdr.first;
        }
        return new Tuple<Long, Boolean>(lowest_seqno, first);
    }

    protected class RetransmitTask
    implements Runnable {
        protected RetransmitTask() {
        }

        @Override
        public void run() {
            ReliableUnicast.this.triggerXmit();
        }

        public String toString() {
            return ReliableUnicast.class.getSimpleName() + ": RetransmitTask (interval=" + ReliableUnicast.this.xmit_interval + " ms)";
        }
    }

    public final class ReceiverEntry
    extends Entry {
        private final Address real_dest;

        public ReceiverEntry(Buffer<Message> received_msgs, short recv_conn_id, Address real_dest) {
            super(recv_conn_id, received_msgs);
            this.real_dest = real_dest;
        }

        Address realDest() {
            return this.real_dest;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.buf != null) {
                sb.append(this.buf).append(", ");
            }
            sb.append("recv_conn_id=" + this.conn_id).append(" (" + this.age() / 1000L + " secs old) - " + this.state);
            if (this.send_ack.get()) {
                sb.append(" [ack pending]");
            }
            return sb.toString();
        }
    }

    protected final class SenderEntry
    extends Entry {
        final AtomicLong seqno;
        final long[] watermark;
        int last_timestamp;

        public SenderEntry(short send_conn_id) {
            super(send_conn_id, ReliableUnicast.this.createBuffer(0L));
            this.seqno = new AtomicLong(1L);
            this.watermark = new long[]{0L, 0L};
        }

        long[] watermark() {
            return this.watermark;
        }

        SenderEntry watermark(long ha, long hs) {
            this.watermark[0] = ha;
            this.watermark[1] = hs;
            return this;
        }

        private synchronized boolean updateLastTimestamp(int ts) {
            boolean success;
            if (this.last_timestamp == 0) {
                this.last_timestamp = ts;
                return true;
            }
            boolean bl = success = ReliableUnicast.compare(ts, this.last_timestamp) > 0;
            if (success) {
                this.last_timestamp = ts;
            }
            return success;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.buf != null) {
                sb.append(this.buf).append(", ");
            }
            sb.append("send_conn_id=" + this.conn_id).append(" (" + this.age() / 1000L + " secs old) - " + this.state);
            if (this.last_timestamp != 0) {
                sb.append(", last-ts: ").append(this.last_timestamp);
            }
            return sb.toString();
        }
    }

    protected abstract class Entry {
        protected final Buffer<Message> buf;
        protected final short conn_id;
        protected final AtomicLong timestamp = new AtomicLong(0L);
        protected volatile State state = State.OPEN;
        protected final AtomicBoolean send_ack = new AtomicBoolean();
        protected final AtomicInteger acks_sent = new AtomicInteger();

        protected Entry(short conn_id, Buffer<Message> buf) {
            this.conn_id = conn_id;
            this.buf = Objects.requireNonNull(buf);
            this.update();
        }

        public Buffer<Message> buf() {
            return this.buf;
        }

        public short connId() {
            return this.conn_id;
        }

        protected void update() {
            this.timestamp.set(ReliableUnicast.this.getTimestamp());
        }

        protected long age() {
            return TimeUnit.MILLISECONDS.convert(ReliableUnicast.this.getTimestamp() - this.timestamp.longValue(), TimeUnit.NANOSECONDS);
        }

        protected boolean needToSendAck() {
            return this.send_ack.compareAndSet(true, false);
        }

        protected Entry sendAck() {
            this.send_ack.compareAndSet(false, true);
            return this;
        }

        protected State state() {
            return this.state;
        }

        protected Entry state(State s) {
            if (this.state != s) {
                switch (this.state) {
                    case OPEN: {
                        if (s != State.CLOSED) break;
                        this.buf.open(false);
                        break;
                    }
                    case CLOSING: {
                        this.buf.open(s != State.CLOSED);
                        break;
                    }
                }
                this.state = s;
                this.update();
            }
            return this;
        }

        public boolean update(int num_acks, IntBinaryOperator op) {
            boolean should_send_ack;
            boolean bl = should_send_ack = this.acks_sent.accumulateAndGet(num_acks, op) == 0;
            if (should_send_ack) {
                return true;
            }
            this.sendAck();
            return false;
        }
    }

    protected static enum State {
        OPEN,
        CLOSING,
        CLOSED;

    }
}

