Hi
We are currently working on a heavily multi-threaded environment where in a normal situation we have a few thousand elements in the MPv3.Cache object.
Our situation is made worse with some network elements sending duplicate response packets which end up triggering the popEntry method in the cache.
Our observation is that the current implementation with the popEntry iterating over the KeySet to find the PduHandle associated with the message id is not efficient and ends up blocking most threads. We use a MultiThreadedMessageDispatcher and in the jstacks we can see 99 or 100 out of 100 threads stuck in the popEntry method.
We are trying with a modified Cache that would add a new map
private final Map<MessageID, Integer> messageToPdu = new WeakHashMap<>(25);
as the MessageID should be only referenced by the StateReference, they will be garbage collected too but this allows us to get the PduHandle in one get rather than iterating over thousands of elements.
Would love to hear whether we are doing something wrong by sharing the MPV3 object over thousands of threads and whether the below approach sounds like a reasonable improvement.
protected static class Cache {
private Map<PduHandle, StateReference<?>> entries = new WeakHashMap<>(25);
private final Map<MessageID, Integer> messageToPdu = new WeakHashMap<>(25);
/**
* Adds a <code>StateReference</code> to the cache. The <code>PduHandle</code> of the supplied entry will be set
* to
* <code>null</code> when the new entry is already part of the cache, because the
* cache uses a <code>WeakHashMap</code> internally which uses the
* <code>PduHandle</code> as key. If the new entry equals an existing entry
* except of the message ID then the new message ID will be added to the existing entry.
*
* @param entry
* the state reference to add.
*
* @return {@link SnmpConstants#SNMP_MP_DOUBLED_MESSAGE} if the entry already exists and {@link
* SnmpConstants#SNMP_MP_OK} on success.
*/
public synchronized int addEntry(StateReference<?> entry) {
if (logger.isDebugEnabled()) {
logger.debug("Adding cache entry: " + entry);
}
StateReference<?> existing = entries.get(entry.getPduHandle());
if (existing != null) {
// reassign handle for comparison:
existing.setPduHandle(entry.getPduHandle());
if (existing.equals(entry)) {
if (logger.isDebugEnabled()) {
logger.debug("Doubled message: " + entry);
}
// clear it again to remove strong self-reference
existing.setPduHandle(null);
return SnmpConstants.SNMP_MP_DOUBLED_MESSAGE;
} else if (existing.equalsExceptMsgID(entry)) {
if (logger.isDebugEnabled()) {
logger.debug("Adding previous message IDs " + existing.getMessageIDs() + " to new entry " + entry);
}
entry.addMessageIDs(existing.getMessageIDs());
} else if (logger.isDebugEnabled()) {
logger.debug("New entry does not match existing, although request ID is the same " + entry + " != " + existing);
}
// clear it again to remove strong self-reference
existing.setPduHandle(null);
}
// add it
PduHandle key = entry.getPduHandle();
// because we are using a weak hash map for the cache, we need to null out
// our key from the entry.
entry.setPduHandle(null);
entries.put(key, entry);
// Add the mapping from all message ids to this PDU Handle
messageToPdu.put(entry.getMsgID(), key.getTransactionID());
if (entry.getMessageIDs() != null) {
for (MessageID id: entry.getMessageIDs()) {
messageToPdu.put(id, key.getTransactionID());
}
}
return SnmpConstants.SNMP_MP_OK;
}
/**
* Delete the cache entry with the supplied {@link PduHandle}.
*
* @param pduHandle
* a pduHandle.
*
* @return {@code true} if an entry has been deleted, {@code false} otherwise.
*/
public synchronized boolean deleteEntry(PduHandle pduHandle) {
StateReference<?> e = entries.remove(pduHandle);
if (e != null) {
// remove all message ids associated with this StateReference object
messageToPdu.remove(e.getMsgID());
if (e.getMessageIDs() != null) {
for (MessageID id: e.getMessageIDs()) {
messageToPdu.remove(id);
}
}
}
return (e != null);
}
/**
* Pop the cache entry with the supplied ID from the cache.
*
* @param msgID
* a message ID.
*
* @return a {@link CacheEntry} instance with the given message ID or {@code null} if such an entry cannot be
* found. If a cache entry is returned, the same is removed from the cache.
*/
public synchronized StateReference<?> popEntry(int msgID) {
// Get the PDU given the message id
Integer pdu = messageToPdu.get(new SimpleMessageID(msgID));
if (pdu == null) {
return null;
}
// Delete given the pdu handle.
PduHandle pduHandle = new PduHandle(pdu);
StateReference e = entries.remove(pduHandle);
if ((e != null) && (e.isMatchingMessageID(msgID))) {
e.setPduHandle(pduHandle);
if (logger.isDebugEnabled()) {
logger.debug("Removed cache entry: "+e);
}
}
return e;
}