MPV3.Cache: slow performance with high concurrency and thousands of PDUs

Hi
We are currently working on a heavily multi-threaded environment where in a normal situation we have a few thousand elements in the MPv3.Cache object.
Our situation is made worse with some network elements sending duplicate response packets which end up triggering the popEntry method in the cache.

Our observation is that the current implementation with the popEntry iterating over the KeySet to find the PduHandle associated with the message id is not efficient and ends up blocking most threads. We use a MultiThreadedMessageDispatcher and in the jstacks we can see 99 or 100 out of 100 threads stuck in the popEntry method.

We are trying with a modified Cache that would add a new map
private final Map<MessageID, Integer> messageToPdu = new WeakHashMap<>(25);

as the MessageID should be only referenced by the StateReference, they will be garbage collected too but this allows us to get the PduHandle in one get rather than iterating over thousands of elements.

Would love to hear whether we are doing something wrong by sharing the MPV3 object over thousands of threads and whether the below approach sounds like a reasonable improvement.

  protected static class Cache {

  private Map<PduHandle, StateReference<?>> entries = new WeakHashMap<>(25);

  private final Map<MessageID, Integer> messageToPdu = new WeakHashMap<>(25);
  
  /**
   * Adds a <code>StateReference</code> to the cache. The <code>PduHandle</code> of the supplied entry will be set
   * to
   * <code>null</code> when the new entry is already part of the cache, because the
   * cache uses a <code>WeakHashMap</code> internally which uses the
   * <code>PduHandle</code> as key. If the new entry equals an existing entry
   * except of the message ID then the new message ID will be added to the existing entry.
   *
   * @param entry
   *         the state reference to add.
   *
   * @return {@link SnmpConstants#SNMP_MP_DOUBLED_MESSAGE} if the entry already exists and {@link
   * SnmpConstants#SNMP_MP_OK} on success.
   */
  public synchronized int addEntry(StateReference<?> entry) {
      if (logger.isDebugEnabled()) {
          logger.debug("Adding cache entry: " + entry);
      }
      StateReference<?> existing = entries.get(entry.getPduHandle());
      if (existing != null) {
          // reassign handle for comparison:
          existing.setPduHandle(entry.getPduHandle());
          if (existing.equals(entry)) {
              if (logger.isDebugEnabled()) {
                  logger.debug("Doubled message: " + entry);
              }
              // clear it again to remove strong self-reference
              existing.setPduHandle(null);
              return SnmpConstants.SNMP_MP_DOUBLED_MESSAGE;
          } else if (existing.equalsExceptMsgID(entry)) {
              if (logger.isDebugEnabled()) {
                  logger.debug("Adding previous message IDs " + existing.getMessageIDs() + " to new entry " + entry);
              }
              entry.addMessageIDs(existing.getMessageIDs());
          } else if (logger.isDebugEnabled()) {
              logger.debug("New entry does not match existing, although request ID is the same " + entry + " != " + existing);
          }
          // clear it again to remove strong self-reference
          existing.setPduHandle(null);
      }
      // add it
      PduHandle key = entry.getPduHandle();
      // because we are using a weak hash map for the cache, we need to null out
      // our key from the entry.
      entry.setPduHandle(null);
      entries.put(key, entry);
      // Add the mapping from all message ids to this PDU Handle
      messageToPdu.put(entry.getMsgID(), key.getTransactionID());
      if (entry.getMessageIDs() != null) {
          for (MessageID id: entry.getMessageIDs()) {
              messageToPdu.put(id, key.getTransactionID());
          }
      }

      return SnmpConstants.SNMP_MP_OK;
  }

  /**
   * Delete the cache entry with the supplied {@link PduHandle}.
   *
   * @param pduHandle
   *         a pduHandle.
   *
   * @return {@code true} if an entry has been deleted, {@code false} otherwise.
   */
  public synchronized boolean deleteEntry(PduHandle pduHandle) {
      StateReference<?> e = entries.remove(pduHandle);
      if (e != null) {
          // remove all message ids associated with this StateReference object
          messageToPdu.remove(e.getMsgID());
          if (e.getMessageIDs() != null) {
              for (MessageID id: e.getMessageIDs()) {
                  messageToPdu.remove(id);
              }
          }
      }
      return (e != null);
  }

  /**
   * Pop the cache entry with the supplied ID from the cache.
   *
   * @param msgID
   *         a message ID.
   *
   * @return a {@link CacheEntry} instance with the given message ID or {@code null} if such an entry cannot be
   * found. If a cache entry is returned, the same is removed from the cache.
   */
  public synchronized StateReference<?> popEntry(int msgID) {
      // Get the PDU given the message id
      Integer pdu = messageToPdu.get(new SimpleMessageID(msgID));
      if (pdu == null) {
          return null;
      }
      // Delete given the pdu handle. 
      PduHandle pduHandle = new PduHandle(pdu);
      StateReference e = entries.remove(pduHandle);
      if ((e != null) && (e.isMatchingMessageID(msgID))) {
        e.setPduHandle(pduHandle);
        if (logger.isDebugEnabled()) {
          logger.debug("Removed cache entry: "+e);
        }
      }
      return e;
  }

How many network elements (SNMP entities) are you polling with those thousands of threads? From my experience less than 50 threads should be sufficient to poll more than 10.000 SNMP entities using asynchronous SNMP requests.
What is your experience?

Independent from that, for your use cases the proposed modification of the cache makes sense. I will pick up that idea in the 3.3 release.

Hi Frank,
thanks for the response and glad to hear you agree on the cache improvements.
In our current setup we have a few thousand “upper layer” threads that will call our SNMP proxy that relies on snmp4j for the communication. Each such thread talks to one network element so we will be collecting concurrently from a few thousands network elements.
For the SNMP part we have

  • a pool of 50 org.snmp4j.Snmp objects so we can send over multiple sockets.
  • a pool of 100 threads receiving in a MultiThreadedDispatcher implementation.

OK, a MPv3 cache optimisation is already part of SNMP4J 2.8.0 and 3.3.1 released this week.