public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address>
Modifier and Type | Class and Description |
---|---|
protected class |
UNICAST3.Entry |
protected class |
UNICAST3.ReceiverEntry |
protected class |
UNICAST3.RetransmitTask
Retransmitter task which periodically (every xmit_interval ms):
If any of the receiver windows have the ack flag set, clears the flag and sends an ack for the
highest delivered seqno to the sender
Checks all receiver windows for missing messages and asks senders for retransmission
For all sender windows, checks if highest acked (HA) < highest sent (HS).
|
protected class |
UNICAST3.SenderEntry |
protected static class |
UNICAST3.State |
after_creation_hook, down_prot, ergonomics, id, log, stack, stats, up_prot
Constructor and Description |
---|
UNICAST3() |
Modifier and Type | Method and Description |
---|---|
protected static int |
accumulate(ToIntFunction<Table> func,
Collection<? extends UNICAST3.Entry>... entries) |
void |
closeConnection(Address mbr)
Removes and resets from connection table (which is already locked).
|
void |
closeIdleConnections() |
void |
closeReceiveConnection(Address mbr) |
void |
closeSendConnection(Address mbr) |
protected static int |
compare(int ts1,
int ts2)
Compares 2 timestamps, handles numeric overflow
|
protected UNICAST3.ReceiverEntry |
createReceiverEntry(Address sender,
long seqno,
short conn_id) |
protected void |
deliverBatch(MessageBatch batch) |
protected void |
deliverMessage(Message msg,
Address sender,
long seqno) |
Object |
down(Event evt)
An event is to be sent down the stack.
|
Object |
down(Message msg)
A message is sent down the stack.
|
void |
expired(Address key)
Called by AgeOutCache, to removed expired connections
|
AgeOutCache<Address> |
getAgeOutCache() |
int |
getAgeOutCacheSize() |
String |
getAvgBatchDeliverySize() |
String |
getLocalAddress() |
long |
getMaxRetransmitTime() |
protected short |
getNewConnectionId() |
long |
getNumAcksReceived() |
long |
getNumAcksSent() |
int |
getNumConnections() |
long |
getNumMessagesReceived() |
long |
getNumMessagesSent() |
int |
getNumReceiveConnections() |
int |
getNumSendConnections() |
int |
getNumUnackedMessages()
The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
|
long |
getNumXmits() |
protected UNICAST3.ReceiverEntry |
getReceiverEntry(Address sender,
long seqno,
boolean first,
short conn_id) |
protected UNICAST3.SenderEntry |
getSenderEntry(Address dst) |
protected long |
getTimestamp() |
int |
getTimestamper() |
int |
getXmitTableDeliverableMessages() |
int |
getXmitTableMissingMessages() |
int |
getXmitTableNumCompactions() |
int |
getXmitTableNumMoves() |
int |
getXmitTableNumPurges() |
int |
getXmitTableNumResizes() |
int |
getXmitTableUndeliveredMessages() |
protected void |
handleAckReceived(Address sender,
long seqno,
short conn_id,
int timestamp)
Add the ACK to hashtable.sender.sent_msgs
|
protected void |
handleBatchFromSelf(MessageBatch batch,
UNICAST3.Entry entry) |
protected void |
handleBatchReceived(UNICAST3.ReceiverEntry entry,
Address sender,
List<LongTuple<Message>> msgs,
boolean oob) |
protected void |
handleDataReceived(Address sender,
long seqno,
short conn_id,
boolean first,
Message msg)
Check whether the hashtable contains an entry e for
sender (create if not). |
protected void |
handleDataReceivedFromSelf(Address sender,
long seqno,
Message msg)
Called when the sender of a message is the local member.
|
protected void |
handleResendingOfFirstMessage(Address sender,
int timestamp)
We need to resend the first message with our conn_id
|
protected void |
handleUpEvent(Address sender,
Message msg,
UnicastHeader3 hdr) |
protected void |
handleXmitRequest(Address sender,
SeqnoList missing) |
boolean |
hasSendConnectionTo(Address dest)
Used for testing only
|
void |
init()
Called after instance has been created (null constructor) and before protocol is started.
|
boolean |
isXmitTaskRunning() |
String |
printAgeOutCache() |
String |
printConnections() |
protected String |
printMessageList(List<LongTuple<Message>> list) |
String |
printReceiveWindowMessages() |
String |
printSendWindowMessages() |
protected void |
processInternalMessage(Table<Message> win,
Address sender) |
void |
removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !
|
protected void |
removeAndDeliver(Table<Message> win,
Address sender)
Try to remove as many messages as possible from the table as pass them up.
|
int |
removeConnections(boolean remove_send_connections,
boolean remove_receive_connections)
Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).
|
int |
removeExpiredConnections() |
protected void |
removeReceiveConnection(Address mbr) |
protected void |
removeSendConnection(Address mbr) |
void |
resetStats() |
protected void |
retransmit(Message msg)
Called by the sender to resend messages for which no ACK has been received yet
|
protected void |
retransmit(SeqnoList missing,
Address sender)
Sends a retransmit request to the given sender
|
protected void |
sendAck(Address dst,
long seqno,
short conn_id) |
void |
sendClose(Address dest,
short conn_id) |
void |
sendPendingAcks() |
protected void |
sendRequestForFirstSeqno(Address dest) |
<T extends Protocol> |
setLevel(String level)
Sets the level of a logger.
|
void |
setMaxRetransmitTime(long max_retransmit_time) |
void |
start()
This method is called on a
JChannel.connect(String) . |
protected void |
startRetransmitTask() |
void |
stop()
This method is called on a
JChannel.disconnect() . |
protected void |
stopRetransmitTask() |
void |
triggerXmit() |
Object |
up(Message msg)
A single message was received.
|
void |
up(MessageBatch batch)
Sends up a multiple messages in a
MessageBatch . |
protected void |
update(UNICAST3.Entry entry,
int num_received) |
accept, afterCreationHook, destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setProperties, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, up
protected static final long DEFAULT_FIRST_SEQNO
protected long conn_expiry_timeout
protected long conn_close_timeout
protected int xmit_table_num_rows
protected int xmit_table_msgs_per_row
protected double xmit_table_resize_factor
protected long xmit_table_max_compaction_time
protected long max_retransmit_time
protected long xmit_interval
protected boolean log_not_found_msgs
protected int ack_threshold
protected long sync_min_interval
protected int max_xmit_req_size
protected long num_msgs_sent
protected long num_msgs_received
protected long num_acks_sent
protected long num_acks_received
protected long num_xmits
protected final LongAdder xmit_reqs_received
protected final LongAdder xmit_reqs_sent
protected final LongAdder xmit_rsps_sent
protected final AverageMinMax avg_delivery_batch_size
protected boolean sends_can_block
protected boolean is_trace
protected final ConcurrentMap<Address,UNICAST3.SenderEntry> send_table
protected final ConcurrentMap<Address,UNICAST3.ReceiverEntry> recv_table
protected final ReentrantLock recv_table_lock
protected final Map<Address,Long> xmit_task_map
protected Future<?> xmit_task
protected Address local_addr
protected TimeScheduler timer
protected volatile boolean running
protected short last_conn_id
protected AgeOutCache<Address> cache
protected TimeService time_service
protected final AtomicInteger timestamper
protected ExpiryCache<Address> last_sync_sent
protected static final Message DUMMY_OOB_MSG
protected final Predicate<Message> drop_oob_and_dont_loopback_msgs_filter
protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR
public String getLocalAddress()
public int getNumSendConnections()
public int getNumReceiveConnections()
public int getNumConnections()
public int getTimestamper()
public String getAvgBatchDeliverySize()
public <T extends Protocol> T setLevel(String level)
Protocol
public String printConnections()
public long getNumMessagesSent()
public long getNumMessagesReceived()
public long getNumAcksSent()
public long getNumAcksReceived()
public long getNumXmits()
public long getMaxRetransmitTime()
public void setMaxRetransmitTime(long max_retransmit_time)
public boolean isXmitTaskRunning()
public int getAgeOutCacheSize()
public String printAgeOutCache()
public AgeOutCache<Address> getAgeOutCache()
public boolean hasSendConnectionTo(Address dest)
public int getNumUnackedMessages()
public int getXmitTableUndeliveredMessages()
public int getXmitTableMissingMessages()
public int getXmitTableDeliverableMessages()
public int getXmitTableNumCompactions()
public int getXmitTableNumMoves()
public int getXmitTableNumResizes()
public int getXmitTableNumPurges()
public String printReceiveWindowMessages()
public String printSendWindowMessages()
public void resetStats()
resetStats
in class Protocol
public void init() throws Exception
Protocol
public void start() throws Exception
Protocol
JChannel.connect(String)
. Starts work.
Protocols are connected and queues are ready to receive events.
Will be called from bottom to top. This call will replace
the START and START_OK events.start
in class Protocol
Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so JChannel.connect(String)
will throw an exceptionpublic void stop()
Protocol
JChannel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushedpublic Object up(Message msg)
Protocol
protected void handleUpEvent(Address sender, Message msg, UnicastHeader3 hdr)
public void up(MessageBatch batch)
Protocol
MessageBatch
. The sender of the batch is always the same, and so is the
destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed
messages, although the transport itself will create initial MessageBatches that contain only either OOB or
regular messages.
The default processing below sends messages up the stack individually, based on a matching criteria
(calling Protocol.accept(org.jgroups.Message)
), and - if true - calls Protocol.up(org.jgroups.Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.
Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.getMatchingMessages(short,boolean)
), then possibly remove and process them and finally pass
the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
encrypted messages in the batch, not remove them, and pass the batch up when done.protected void handleBatchFromSelf(MessageBatch batch, UNICAST3.Entry entry)
public Object down(Event evt)
Protocol
down_prot.down()
.public Object down(Message msg)
Protocol
public void closeConnection(Address mbr)
public void closeSendConnection(Address mbr)
public void closeReceiveConnection(Address mbr)
protected void removeSendConnection(Address mbr)
protected void removeReceiveConnection(Address mbr)
public void removeAllConnections()
protected void retransmit(SeqnoList missing, Address sender)
protected void retransmit(Message msg)
public void expired(Address key)
expired
in interface AgeOutCache.Handler<Address>
key
- protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg)
sender
(create if not). If
e.received_msgs is null and first
is true: create a new AckReceiverWindow(seqno) and
add message. Set e.received_msgs to the new window. Else just add the message.protected void handleDataReceivedFromSelf(Address sender, long seqno, Message msg)
protected void handleBatchReceived(UNICAST3.ReceiverEntry entry, Address sender, List<LongTuple<Message>> msgs, boolean oob)
protected void removeAndDeliver(Table<Message> win, Address sender)
protected UNICAST3.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id)
protected UNICAST3.SenderEntry getSenderEntry(Address dst)
protected UNICAST3.ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id)
protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp)
protected void handleResendingOfFirstMessage(Address sender, int timestamp)
sender
- protected void deliverBatch(MessageBatch batch)
protected long getTimestamp()
protected void startRetransmitTask()
protected void stopRetransmitTask()
protected void sendAck(Address dst, long seqno, short conn_id)
protected short getNewConnectionId()
protected void sendRequestForFirstSeqno(Address dest)
public void sendClose(Address dest, short conn_id)
public void closeIdleConnections()
public int removeExpiredConnections()
public int removeConnections(boolean remove_send_connections, boolean remove_receive_connections)
remove_send_connections
- If true, send connections whose state is !OPEN are destroyed and removedremove_receive_connections
- If true, receive connections with state !OPEN are destroyed and removedprotected void update(UNICAST3.Entry entry, int num_received)
protected static int compare(int ts1, int ts2)
@SafeVarargs protected static int accumulate(ToIntFunction<Table> func, Collection<? extends UNICAST3.Entry>... entries)
public void triggerXmit()
public void sendPendingAcks()
Copyright © 2018 JBoss, a division of Red Hat. All rights reserved.