001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038
039import javax.management.ObjectName;
040
041import org.apache.activemq.DestinationDoesNotExistException;
042import org.apache.activemq.Service;
043import org.apache.activemq.advisory.AdvisoryBroker;
044import org.apache.activemq.advisory.AdvisorySupport;
045import org.apache.activemq.broker.BrokerService;
046import org.apache.activemq.broker.BrokerServiceAware;
047import org.apache.activemq.broker.ConnectionContext;
048import org.apache.activemq.broker.TransportConnection;
049import org.apache.activemq.broker.region.AbstractRegion;
050import org.apache.activemq.broker.region.DurableTopicSubscription;
051import org.apache.activemq.broker.region.Region;
052import org.apache.activemq.broker.region.RegionBroker;
053import org.apache.activemq.broker.region.Subscription;
054import org.apache.activemq.broker.region.policy.PolicyEntry;
055import org.apache.activemq.command.ActiveMQDestination;
056import org.apache.activemq.command.ActiveMQMessage;
057import org.apache.activemq.command.ActiveMQTempDestination;
058import org.apache.activemq.command.ActiveMQTopic;
059import org.apache.activemq.command.BrokerId;
060import org.apache.activemq.command.BrokerInfo;
061import org.apache.activemq.command.Command;
062import org.apache.activemq.command.ConnectionError;
063import org.apache.activemq.command.ConnectionId;
064import org.apache.activemq.command.ConnectionInfo;
065import org.apache.activemq.command.ConsumerId;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.DataStructure;
068import org.apache.activemq.command.DestinationInfo;
069import org.apache.activemq.command.ExceptionResponse;
070import org.apache.activemq.command.KeepAliveInfo;
071import org.apache.activemq.command.Message;
072import org.apache.activemq.command.MessageAck;
073import org.apache.activemq.command.MessageDispatch;
074import org.apache.activemq.command.MessageId;
075import org.apache.activemq.command.NetworkBridgeFilter;
076import org.apache.activemq.command.ProducerInfo;
077import org.apache.activemq.command.RemoveInfo;
078import org.apache.activemq.command.RemoveSubscriptionInfo;
079import org.apache.activemq.command.Response;
080import org.apache.activemq.command.SessionInfo;
081import org.apache.activemq.command.ShutdownInfo;
082import org.apache.activemq.command.SubscriptionInfo;
083import org.apache.activemq.command.WireFormatInfo;
084import org.apache.activemq.filter.DestinationFilter;
085import org.apache.activemq.filter.MessageEvaluationContext;
086import org.apache.activemq.security.SecurityContext;
087import org.apache.activemq.transport.DefaultTransportListener;
088import org.apache.activemq.transport.FutureResponse;
089import org.apache.activemq.transport.ResponseCallback;
090import org.apache.activemq.transport.Transport;
091import org.apache.activemq.transport.TransportDisposedIOException;
092import org.apache.activemq.transport.TransportFilter;
093import org.apache.activemq.transport.nio.NIOSSLTransport;
094import org.apache.activemq.transport.failover.FailoverTransport;
095import org.apache.activemq.transport.tcp.SslTransport;
096import org.apache.activemq.util.IdGenerator;
097import org.apache.activemq.util.IntrospectionSupport;
098import org.apache.activemq.util.LongSequenceGenerator;
099import org.apache.activemq.util.MarshallingSupport;
100import org.apache.activemq.util.ServiceStopper;
101import org.apache.activemq.util.ServiceSupport;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105/**
106 * A useful base class for implementing demand forwarding bridges.
107 */
108public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
109    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
110    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
111    protected final Transport localBroker;
112    protected final Transport remoteBroker;
113    protected IdGenerator idGenerator = new IdGenerator();
114    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
115    protected ConnectionInfo localConnectionInfo;
116    protected ConnectionInfo remoteConnectionInfo;
117    protected SessionInfo localSessionInfo;
118    protected ProducerInfo producerInfo;
119    protected String remoteBrokerName = "Unknown";
120    protected String localClientId;
121    protected ConsumerInfo demandConsumerInfo;
122    protected int demandConsumerDispatched;
123    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
124    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
125    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
126    protected final AtomicBoolean disposed = new AtomicBoolean();
127    protected BrokerId localBrokerId;
128    protected ActiveMQDestination[] excludedDestinations;
129    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
130    protected ActiveMQDestination[] staticallyIncludedDestinations;
131    protected ActiveMQDestination[] durableDestinations;
132    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
133    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
134    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
135    protected final CountDownLatch startedLatch = new CountDownLatch(2);
136    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
137    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
138    protected NetworkBridgeConfiguration configuration;
139    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
140
141    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
142    protected BrokerId remoteBrokerId;
143
144    final AtomicLong enqueueCounter = new AtomicLong();
145    final AtomicLong dequeueCounter = new AtomicLong();
146
147    private NetworkBridgeListener networkBridgeListener;
148    private boolean createdByDuplex;
149    private BrokerInfo localBrokerInfo;
150    private BrokerInfo remoteBrokerInfo;
151
152    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
153    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
154
155    private final AtomicBoolean started = new AtomicBoolean();
156    private TransportConnection duplexInitiatingConnection;
157    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
158    protected BrokerService brokerService = null;
159    private ObjectName mbeanObjectName;
160    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
161    private Transport duplexInboundLocalBroker = null;
162    private ProducerInfo duplexInboundLocalProducerInfo;
163
164    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
165        this.configuration = configuration;
166        this.localBroker = localBroker;
167        this.remoteBroker = remoteBroker;
168    }
169
170    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
171        this.localBrokerInfo = localBrokerInfo;
172        this.remoteBrokerInfo = remoteBrokerInfo;
173        this.duplexInitiatingConnection = connection;
174        start();
175        serviceRemoteCommand(remoteBrokerInfo);
176    }
177
178    @Override
179    public void start() throws Exception {
180        if (started.compareAndSet(false, true)) {
181
182            if (brokerService == null) {
183                throw new IllegalArgumentException("BrokerService is null on " + this);
184            }
185
186            if (isDuplex()) {
187                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
188                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
189
190                    @Override
191                    public void onCommand(Object o) {
192                        Command command = (Command) o;
193                        serviceLocalCommand(command);
194                    }
195
196                    @Override
197                    public void onException(IOException error) {
198                        serviceLocalException(error);
199                    }
200                });
201                duplexInboundLocalBroker.start();
202            }
203
204            localBroker.setTransportListener(new DefaultTransportListener() {
205
206                @Override
207                public void onCommand(Object o) {
208                    Command command = (Command) o;
209                    serviceLocalCommand(command);
210                }
211
212                @Override
213                public void onException(IOException error) {
214                    if (!futureLocalBrokerInfo.isDone()) {
215                        LOG.info("error with pending local brokerInfo on: " + localBroker, error);
216                        futureLocalBrokerInfo.cancel(true);
217                        return;
218                    }
219                    serviceLocalException(error);
220                }
221            });
222
223            remoteBroker.setTransportListener(new DefaultTransportListener() {
224
225                @Override
226                public void onCommand(Object o) {
227                    Command command = (Command) o;
228                    serviceRemoteCommand(command);
229                }
230
231                @Override
232                public void onException(IOException error) {
233                    if (!futureRemoteBrokerInfo.isDone()) {
234                        LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
235                        futureRemoteBrokerInfo.cancel(true);
236                        return;
237                    }
238                    serviceRemoteException(error);
239                }
240            });
241
242            remoteBroker.start();
243            localBroker.start();
244
245            if (!disposed.get()) {
246                try {
247                    triggerStartAsyncNetworkBridgeCreation();
248                } catch (IOException e) {
249                    LOG.warn("Caught exception from remote start", e);
250                }
251            } else {
252                LOG.warn("Bridge was disposed before the start() method was fully executed.");
253                throw new TransportDisposedIOException();
254            }
255        }
256    }
257
258    @Override
259    public void stop() throws Exception {
260        if (started.compareAndSet(true, false)) {
261            if (disposed.compareAndSet(false, true)) {
262                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
263
264                futureRemoteBrokerInfo.cancel(true);
265                futureLocalBrokerInfo.cancel(true);
266
267                NetworkBridgeListener l = this.networkBridgeListener;
268                if (l != null) {
269                    l.onStop(this);
270                }
271                try {
272                    // local start complete
273                    if (startedLatch.getCount() < 2) {
274                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
275                                configuration.getBrokerName(), this, remoteBrokerName
276                        });
277                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
278                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
279                    }
280
281                    remoteBridgeStarted.set(false);
282                    final CountDownLatch sendShutdown = new CountDownLatch(1);
283
284                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
285                        @Override
286                        public void run() {
287                            try {
288                                serialExecutor.shutdown();
289                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
290                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
291                                    LOG.info("pending tasks on stop {}", pendingTasks);
292                                }
293                                localBroker.oneway(new ShutdownInfo());
294                                remoteBroker.oneway(new ShutdownInfo());
295                            } catch (Throwable e) {
296                                LOG.debug("Caught exception sending shutdown", e);
297                            } finally {
298                                sendShutdown.countDown();
299                            }
300
301                        }
302                    }, "ActiveMQ ForwardingBridge StopTask");
303
304                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
305                        LOG.info("Network Could not shutdown in a timely manner");
306                    }
307                } finally {
308                    ServiceStopper ss = new ServiceStopper();
309                    stopFailoverTransport(remoteBroker);
310                    ss.stop(remoteBroker);
311                    ss.stop(localBroker);
312                    ss.stop(duplexInboundLocalBroker);
313                    // Release the started Latch since another thread could be
314                    // stuck waiting for it to start up.
315                    startedLatch.countDown();
316                    startedLatch.countDown();
317                    localStartedLatch.countDown();
318
319                    ss.throwFirstException();
320                }
321            }
322
323            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
324        }
325    }
326
327    private void stopFailoverTransport(Transport transport) {
328        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
329        if (failoverTransport != null) {
330            // may be blocked on write, in which case stop will block
331            try {
332                failoverTransport.handleTransportFailure(new IOException("Bridge stopped"));
333            } catch (InterruptedException ignored) {}
334        }
335    }
336
337    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
338        brokerService.getTaskRunnerFactory().execute(new Runnable() {
339            @Override
340            public void run() {
341                final String originalName = Thread.currentThread().getName();
342                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
343                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
344
345                try {
346                    // First we collect the info data from both the local and remote ends
347                    collectBrokerInfos();
348
349                    // Once we have all required broker info we can attempt to start
350                    // the local and then remote sides of the bridge.
351                    doStartLocalAndRemoteBridges();
352                } finally {
353                    Thread.currentThread().setName(originalName);
354                }
355            }
356        });
357    }
358
359    private void collectBrokerInfos() {
360
361        // First wait for the remote to feed us its BrokerInfo, then we can check on
362        // the LocalBrokerInfo and decide is this is a loop.
363        try {
364            remoteBrokerInfo = futureRemoteBrokerInfo.get();
365            if (remoteBrokerInfo == null) {
366                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
367                return;
368            }
369        } catch (Exception e) {
370            serviceRemoteException(e);
371            return;
372        }
373
374        try {
375            localBrokerInfo = futureLocalBrokerInfo.get();
376            if (localBrokerInfo == null) {
377                serviceLocalException(new Throwable("localBrokerInfo is null"));
378                return;
379            }
380
381            // Before we try and build the bridge lets check if we are in a loop
382            // and if so just stop now before registering anything.
383            remoteBrokerId = remoteBrokerInfo.getBrokerId();
384            if (localBrokerId.equals(remoteBrokerId)) {
385                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
386                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
387                });
388                ServiceSupport.dispose(localBroker);
389                ServiceSupport.dispose(remoteBroker);
390                // the bridge is left in a bit of limbo, but it won't get retried
391                // in this state.
392                return;
393            }
394
395            // Fill in the remote broker's information now.
396            remoteBrokerPath[0] = remoteBrokerId;
397            remoteBrokerName = remoteBrokerInfo.getBrokerName();
398            if (configuration.isUseBrokerNamesAsIdSeed()) {
399                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
400            }
401        } catch (Throwable e) {
402            serviceLocalException(e);
403        }
404    }
405
406    private void doStartLocalAndRemoteBridges() {
407
408        if (disposed.get()) {
409            return;
410        }
411
412        if (isCreatedByDuplex()) {
413            // apply remote (propagated) configuration to local duplex bridge before start
414            Properties props = null;
415            try {
416                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
417                IntrospectionSupport.getProperties(configuration, props, null);
418                if (configuration.getExcludedDestinations() != null) {
419                    excludedDestinations = configuration.getExcludedDestinations().toArray(
420                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
421                }
422                if (configuration.getStaticallyIncludedDestinations() != null) {
423                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
424                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
425                }
426                if (configuration.getDynamicallyIncludedDestinations() != null) {
427                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
428                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
429                }
430            } catch (Throwable t) {
431                LOG.error("Error mapping remote configuration: {}", props, t);
432            }
433        }
434
435        try {
436            startLocalBridge();
437        } catch (Throwable e) {
438            serviceLocalException(e);
439            return;
440        }
441
442        try {
443            startRemoteBridge();
444        } catch (Throwable e) {
445            serviceRemoteException(e);
446            return;
447        }
448
449        try {
450            if (safeWaitUntilStarted()) {
451                setupStaticDestinations();
452            }
453        } catch (Throwable e) {
454            serviceLocalException(e);
455        }
456    }
457
458    private void startLocalBridge() throws Throwable {
459        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
460            synchronized (this) {
461                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
462                if (!disposed.get()) {
463
464                    if (idGenerator == null) {
465                        throw new IllegalStateException("Id Generator cannot be null");
466                    }
467
468                    localConnectionInfo = new ConnectionInfo();
469                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
470                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
471                    localConnectionInfo.setClientId(localClientId);
472                    localConnectionInfo.setUserName(configuration.getUserName());
473                    localConnectionInfo.setPassword(configuration.getPassword());
474                    Transport originalTransport = remoteBroker;
475                    while (originalTransport instanceof TransportFilter) {
476                        originalTransport = ((TransportFilter) originalTransport).getNext();
477                    }
478                    setTransportContext(originalTransport, localConnectionInfo);
479                    // sync requests that may fail
480                    Object resp = localBroker.request(localConnectionInfo);
481                    if (resp instanceof ExceptionResponse) {
482                        throw ((ExceptionResponse) resp).getException();
483                    }
484                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
485                    localBroker.oneway(localSessionInfo);
486
487                    if (configuration.isDuplex()) {
488                        // separate in-bound channel for forwards so we don't
489                        // contend with out-bound dispatch on same connection
490                        remoteBrokerInfo.setNetworkConnection(true);
491                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
492
493                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
494                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
495                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
496                                + configuration.getBrokerName());
497                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
498                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
499
500                        setTransportContext(originalTransport, duplexLocalConnectionInfo);
501
502                        // sync requests that may fail
503                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
504                        if (resp instanceof ExceptionResponse) {
505                            throw ((ExceptionResponse) resp).getException();
506                        }
507                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
508                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
509                        duplexInboundLocalBroker.oneway(duplexInboundSession);
510                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
511                    }
512                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
513                    NetworkBridgeListener l = this.networkBridgeListener;
514                    if (l != null) {
515                        l.onStart(this);
516                    }
517
518                    // Let the local broker know the remote broker's ID.
519                    localBroker.oneway(remoteBrokerInfo);
520                    // new peer broker (a consumer can work with remote broker also)
521                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
522
523                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
524                            localBroker, remoteBroker, remoteBrokerName
525                    });
526                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
527                            configuration.getBrokerName(), this, remoteBrokerName
528                    });
529                } else {
530                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
531                }
532                startedLatch.countDown();
533                localStartedLatch.countDown();
534            }
535        }
536    }
537
538    private void setTransportContext(Transport transport, ConnectionInfo connectionInfo) {
539        if (transport instanceof SslTransport) {
540            connectionInfo.setTransportContext(((SslTransport)transport).getPeerCertificates());
541        } else if (transport instanceof NIOSSLTransport) {
542            connectionInfo.setTransportContext(((NIOSSLTransport)transport).getPeerCertificates());
543        }
544    }
545
546    protected void startRemoteBridge() throws Exception {
547        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
548            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
549            synchronized (this) {
550                if (!isCreatedByDuplex()) {
551                    BrokerInfo brokerInfo = new BrokerInfo();
552                    brokerInfo.setBrokerName(configuration.getBrokerName());
553                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
554                    brokerInfo.setNetworkConnection(true);
555                    brokerInfo.setDuplexConnection(configuration.isDuplex());
556                    // set our properties
557                    Properties props = new Properties();
558                    IntrospectionSupport.getProperties(configuration, props, null);
559                    props.remove("networkTTL");
560                    String str = MarshallingSupport.propertiesToString(props);
561                    brokerInfo.setNetworkProperties(str);
562                    brokerInfo.setBrokerId(this.localBrokerId);
563                    remoteBroker.oneway(brokerInfo);
564                }
565                if (remoteConnectionInfo != null) {
566                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
567                }
568                remoteConnectionInfo = new ConnectionInfo();
569                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
570                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
571                remoteConnectionInfo.setUserName(configuration.getUserName());
572                remoteConnectionInfo.setPassword(configuration.getPassword());
573                remoteBroker.oneway(remoteConnectionInfo);
574
575                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
576                remoteBroker.oneway(remoteSessionInfo);
577                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
578                producerInfo.setResponseRequired(false);
579                remoteBroker.oneway(producerInfo);
580                // Listen to consumer advisory messages on the remote broker to determine demand.
581                if (!configuration.isStaticBridge()) {
582                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
583                    // always dispatch advisory message asynchronously so that
584                    // we never block the producer broker if we are slow
585                    demandConsumerInfo.setDispatchAsync(true);
586                    String advisoryTopic = configuration.getDestinationFilter();
587                    if (configuration.isBridgeTempDestinations()) {
588                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
589                    }
590                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
591                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
592                    remoteBroker.oneway(demandConsumerInfo);
593                }
594                startedLatch.countDown();
595            }
596        }
597    }
598
599    @Override
600    public void serviceRemoteException(Throwable error) {
601        if (!disposed.get()) {
602            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
603                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
604                        localBroker, remoteBroker, error
605                });
606            } else {
607                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
608                        localBroker, remoteBroker, error
609                });
610            }
611            LOG.debug("The remote Exception was: {}", error, error);
612            brokerService.getTaskRunnerFactory().execute(new Runnable() {
613                @Override
614                public void run() {
615                    ServiceSupport.dispose(getControllingService());
616                }
617            });
618            fireBridgeFailed(error);
619        }
620    }
621
622    protected void serviceRemoteCommand(Command command) {
623        if (!disposed.get()) {
624            try {
625                if (command.isMessageDispatch()) {
626                    safeWaitUntilStarted();
627                    MessageDispatch md = (MessageDispatch) command;
628                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
629                    ackAdvisory(md.getMessage());
630                } else if (command.isBrokerInfo()) {
631                    futureRemoteBrokerInfo.set((BrokerInfo) command);
632                } else if (command.getClass() == ConnectionError.class) {
633                    ConnectionError ce = (ConnectionError) command;
634                    serviceRemoteException(ce.getException());
635                } else {
636                    if (isDuplex()) {
637                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
638                        if (command.isMessage()) {
639                            final ActiveMQMessage message = (ActiveMQMessage) command;
640                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
641                                serviceRemoteConsumerAdvisory(message.getDataStructure());
642                                ackAdvisory(message);
643                            } else {
644                                if (!isPermissableDestination(message.getDestination(), true)) {
645                                    return;
646                                }
647                                // message being forwarded - we need to
648                                // propagate the response to our local send
649                                if (canDuplexDispatch(message)) {
650                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
651                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
652                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
653                                            final int correlationId = message.getCommandId();
654
655                                            @Override
656                                            public void onCompletion(FutureResponse resp) {
657                                                try {
658                                                    Response reply = resp.getResult();
659                                                    reply.setCorrelationId(correlationId);
660                                                    remoteBroker.oneway(reply);
661                                                } catch (IOException error) {
662                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
663                                                    serviceRemoteException(error);
664                                                }
665                                            }
666                                        });
667                                    } else {
668                                        duplexInboundLocalBroker.oneway(message);
669                                    }
670                                    serviceInboundMessage(message);
671                                } else {
672                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
673                                        Response reply = new Response();
674                                        reply.setCorrelationId(message.getCommandId());
675                                        remoteBroker.oneway(reply);
676                                    }
677                                }
678                            }
679                        } else {
680                            switch (command.getDataStructureType()) {
681                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
682                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
683                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
684                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
685                                    } else {
686                                        localBroker.oneway(command);
687                                    }
688                                    break;
689                                case SessionInfo.DATA_STRUCTURE_TYPE:
690                                    localBroker.oneway(command);
691                                    break;
692                                case ProducerInfo.DATA_STRUCTURE_TYPE:
693                                    // using duplexInboundLocalProducerInfo
694                                    break;
695                                case MessageAck.DATA_STRUCTURE_TYPE:
696                                    MessageAck ack = (MessageAck) command;
697                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
698                                    if (localSub != null) {
699                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
700                                        localBroker.oneway(ack);
701                                    } else {
702                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
703                                    }
704                                    break;
705                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
706                                    localStartedLatch.await();
707                                    if (started.get()) {
708                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
709                                        if (isDuplicateSuppressionOff(consumerInfo)) {
710                                            addConsumerInfo(consumerInfo);
711                                        } else {
712                                            synchronized (brokerService.getVmConnectorURI()) {
713                                                addConsumerInfo(consumerInfo);
714                                            }
715                                        }
716                                    } else {
717                                        // received a subscription whilst stopping
718                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
719                                    }
720                                    break;
721                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
722                                    // initiator is shutting down, controlled case
723                                    // abortive close dealt with by inactivity monitor
724                                    LOG.info("Stopping network bridge on shutdown of remote broker");
725                                    serviceRemoteException(new IOException(command.toString()));
726                                    break;
727                                default:
728                                    LOG.debug("Ignoring remote command: {}", command);
729                            }
730                        }
731                    } else {
732                        switch (command.getDataStructureType()) {
733                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
734                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
735                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
736                                break;
737                            default:
738                                LOG.warn("Unexpected remote command: {}", command);
739                        }
740                    }
741                }
742            } catch (Throwable e) {
743                LOG.debug("Exception processing remote command: {}", command, e);
744                serviceRemoteException(e);
745            }
746        }
747    }
748
749    private void ackAdvisory(Message message) throws IOException {
750        demandConsumerDispatched++;
751        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
752            final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
753            ack.setConsumerId(demandConsumerInfo.getConsumerId());
754            brokerService.getTaskRunnerFactory().execute(new Runnable() {
755                @Override
756                public void run() {
757                    try {
758                        remoteBroker.oneway(ack);
759                    } catch (IOException e) {
760                        LOG.warn("Failed to send advisory ack " + ack, e);
761                    }
762                }
763            });
764            demandConsumerDispatched = 0;
765        }
766    }
767
768    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
769        final int networkTTL = configuration.getConsumerTTL();
770        if (data.getClass() == ConsumerInfo.class) {
771            // Create a new local subscription
772            ConsumerInfo info = (ConsumerInfo) data;
773            BrokerId[] path = info.getBrokerPath();
774
775            if (info.isBrowser()) {
776                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
777                return;
778            }
779
780            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
781                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
782                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
783                });
784                return;
785            }
786
787            if (contains(path, localBrokerPath[0])) {
788                // Ignore this consumer as it's a consumer we locally sent to the broker.
789                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
790                        configuration.getBrokerName(), remoteBrokerName, info
791                });
792                return;
793            }
794
795            if (!isPermissableDestination(info.getDestination())) {
796                // ignore if not in the permitted or in the excluded list
797                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
798                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
799                });
800                return;
801            }
802
803            // in a cyclic network there can be multiple bridges per broker that can propagate
804            // a network subscription so there is a need to synchronize on a shared entity
805            // if duplicate suppression is required
806            if (isDuplicateSuppressionOff(info)) {
807                addConsumerInfo(info);
808            } else {
809                synchronized (brokerService.getVmConnectorURI()) {
810                    addConsumerInfo(info);
811                }
812            }
813        } else if (data.getClass() == DestinationInfo.class) {
814            // It's a destination info - we want to pass up information about temporary destinations
815            final DestinationInfo destInfo = (DestinationInfo) data;
816            BrokerId[] path = destInfo.getBrokerPath();
817            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
818                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
819                        configuration.getBrokerName(), destInfo, networkTTL
820                });
821                return;
822            }
823            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
824                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
825                return;
826            }
827            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
828            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
829                // re-set connection id so comes from here
830                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
831                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
832            }
833            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
834            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
835                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
836            });
837            if (destInfo.isRemoveOperation()) {
838                // Serialize with removeSub operations such that all removeSub advisories
839                // are generated
840                serialExecutor.execute(new Runnable() {
841                    @Override
842                    public void run() {
843                        try {
844                            localBroker.oneway(destInfo);
845                        } catch (IOException e) {
846                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
847                        }
848                    }
849                });
850            } else {
851                localBroker.oneway(destInfo);
852            }
853        } else if (data.getClass() == RemoveInfo.class) {
854            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
855            removeDemandSubscription(id);
856        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
857            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
858            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
859            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
860                DemandSubscription ds = i.next();
861                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
862                if (removed) {
863                    if (ds.getDurableRemoteSubs().isEmpty()) {
864
865                        // deactivate subscriber
866                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
867                        localBroker.oneway(removeInfo);
868
869                        // remove subscriber
870                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
871                        sending.setClientId(localClientId);
872                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
873                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
874                        localBroker.oneway(sending);
875                    }
876                }
877            }
878        }
879    }
880
881    @Override
882    public void serviceLocalException(Throwable error) {
883        serviceLocalException(null, error);
884    }
885
886    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
887        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
888        if (!disposed.get()) {
889            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
890                // not a reason to terminate the bridge - temps can disappear with
891                // pending sends as the demand sub may outlive the remote dest
892                if (messageDispatch != null) {
893                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
894                    try {
895                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
896                        poisonAck.setPoisonCause(error);
897                        localBroker.oneway(poisonAck);
898                    } catch (IOException ioe) {
899                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
900                    }
901                    fireFailedForwardAdvisory(messageDispatch, error);
902                } else {
903                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
904                }
905                return;
906            }
907
908            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
909            LOG.debug("The local Exception was: {}", error, error);
910
911            brokerService.getTaskRunnerFactory().execute(new Runnable() {
912                @Override
913                public void run() {
914                    ServiceSupport.dispose(getControllingService());
915                }
916            });
917            fireBridgeFailed(error);
918        }
919    }
920
921    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
922        if (configuration.isAdvisoryForFailedForward()) {
923            AdvisoryBroker advisoryBroker = null;
924            try {
925                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
926
927                if (advisoryBroker != null) {
928                    ConnectionContext context = new ConnectionContext();
929                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
930                    context.setBroker(brokerService.getBroker());
931
932                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
933                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
934                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
935                            advisoryMessage);
936
937                }
938            } catch (Exception e) {
939                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
940                LOG.debug("detail", e);
941            }
942        }
943    }
944
945    protected Service getControllingService() {
946        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
947    }
948
949    protected void addSubscription(DemandSubscription sub) throws IOException {
950        if (sub != null) {
951            localBroker.oneway(sub.getLocalInfo());
952        }
953    }
954
955    protected void removeSubscription(final DemandSubscription sub) throws IOException {
956        if (sub != null) {
957            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
958
959            // ensure not available for conduit subs pending removal
960            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
961            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
962
963            // continue removal in separate thread to free up this thread for outstanding responses
964            // Serialize with removeDestination operations so that removeSubs are serialized with
965            // removeDestinations such that all removeSub advisories are generated
966            serialExecutor.execute(new Runnable() {
967                @Override
968                public void run() {
969                    sub.waitForCompletion();
970                    try {
971                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
972                    } catch (IOException e) {
973                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
974                    }
975                }
976            });
977        }
978    }
979
980    protected Message configureMessage(MessageDispatch md) throws IOException {
981        Message message = md.getMessage().copy();
982        // Update the packet to show where it came from.
983        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
984        message.setProducerId(producerInfo.getProducerId());
985        message.setDestination(md.getDestination());
986        message.setMemoryUsage(null);
987        if (message.getOriginalTransactionId() == null) {
988            message.setOriginalTransactionId(message.getTransactionId());
989        }
990        message.setTransactionId(null);
991        if (configuration.isUseCompression()) {
992            message.compress();
993        }
994        return message;
995    }
996
997    protected void serviceLocalCommand(Command command) {
998        if (!disposed.get()) {
999            try {
1000                if (command.isMessageDispatch()) {
1001                    safeWaitUntilStarted();
1002                    enqueueCounter.incrementAndGet();
1003                    final MessageDispatch md = (MessageDispatch) command;
1004                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1005                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1006
1007                        if (suppressMessageDispatch(md, sub)) {
1008                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1009                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1010                            });
1011                            // still ack as it may be durable
1012                            try {
1013                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1014                            } finally {
1015                                sub.decrementOutstandingResponses();
1016                            }
1017                            return;
1018                        }
1019
1020                        Message message = configureMessage(md);
1021                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1022                                configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message
1023                        });
1024
1025                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1026                            try {
1027                                // never request b/c they are eventually acked async
1028                                remoteBroker.oneway(message);
1029                            } finally {
1030                                sub.decrementOutstandingResponses();
1031                            }
1032                            return;
1033                        }
1034
1035                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1036
1037                            // The message was not sent using async send, so we should only
1038                            // ack the local broker when we get confirmation that the remote
1039                            // broker has received the message.
1040                            remoteBroker.asyncRequest(message, new ResponseCallback() {
1041                                @Override
1042                                public void onCompletion(FutureResponse future) {
1043                                    try {
1044                                        Response response = future.getResult();
1045                                        if (response.isException()) {
1046                                            ExceptionResponse er = (ExceptionResponse) response;
1047                                            serviceLocalException(md, er.getException());
1048                                        } else {
1049                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1050                                            dequeueCounter.incrementAndGet();
1051                                        }
1052                                    } catch (IOException e) {
1053                                        serviceLocalException(md, e);
1054                                    } finally {
1055                                        sub.decrementOutstandingResponses();
1056                                    }
1057                                }
1058                            });
1059
1060                        } else {
1061                            // If the message was originally sent using async send, we will
1062                            // preserve that QOS by bridging it using an async send (small chance
1063                            // of message loss).
1064                            try {
1065                                remoteBroker.oneway(message);
1066                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1067                                dequeueCounter.incrementAndGet();
1068                            } finally {
1069                                sub.decrementOutstandingResponses();
1070                            }
1071                        }
1072                        serviceOutbound(message);
1073                    } else {
1074                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1075                    }
1076                } else if (command.isBrokerInfo()) {
1077                    futureLocalBrokerInfo.set((BrokerInfo) command);
1078                } else if (command.isShutdownInfo()) {
1079                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1080                    stop();
1081                } else if (command.getClass() == ConnectionError.class) {
1082                    ConnectionError ce = (ConnectionError) command;
1083                    serviceLocalException(ce.getException());
1084                } else {
1085                    switch (command.getDataStructureType()) {
1086                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1087                            break;
1088                        default:
1089                            LOG.warn("Unexpected local command: {}", command);
1090                    }
1091                }
1092            } catch (Throwable e) {
1093                LOG.warn("Caught an exception processing local command", e);
1094                serviceLocalException(e);
1095            }
1096        }
1097    }
1098
1099    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1100        boolean suppress = false;
1101        // for durable subs, suppression via filter leaves dangling acks so we
1102        // need to check here and allow the ack irrespective
1103        if (sub.getLocalInfo().isDurable()) {
1104            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1105            messageEvalContext.setMessageReference(md.getMessage());
1106            messageEvalContext.setDestination(md.getDestination());
1107            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1108            //AMQ-6465 - Need to decrement the reference count after checking matches() as
1109            //the call above will increment the reference count by 1
1110            messageEvalContext.getMessageReference().decrementReferenceCount();
1111        }
1112        return suppress;
1113    }
1114
1115    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1116        if (brokerPath != null) {
1117            for (BrokerId id : brokerPath) {
1118                if (brokerId.equals(id)) {
1119                    return true;
1120                }
1121            }
1122        }
1123        return false;
1124    }
1125
1126    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1127        if (brokerPath == null || brokerPath.length == 0) {
1128            return pathsToAppend;
1129        }
1130        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1131        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1132        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1133        return rc;
1134    }
1135
1136    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1137        if (brokerPath == null || brokerPath.length == 0) {
1138            return new BrokerId[]{idToAppend};
1139        }
1140        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1141        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1142        rc[brokerPath.length] = idToAppend;
1143        return rc;
1144    }
1145
1146    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1147        return isPermissableDestination(destination, false);
1148    }
1149
1150    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1151        // Are we not bridging temporary destinations?
1152        if (destination.isTemporary()) {
1153            if (allowTemporary) {
1154                return true;
1155            } else {
1156                return configuration.isBridgeTempDestinations();
1157            }
1158        }
1159
1160        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1161        if (dests != null && dests.length > 0) {
1162            for (ActiveMQDestination dest : dests) {
1163                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1164                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1165                    return true;
1166                }
1167            }
1168        }
1169
1170        dests = excludedDestinations;
1171        if (dests != null && dests.length > 0) {
1172            for (ActiveMQDestination dest : dests) {
1173                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1174                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1175                    return false;
1176                }
1177            }
1178        }
1179
1180        dests = dynamicallyIncludedDestinations;
1181        if (dests != null && dests.length > 0) {
1182            for (ActiveMQDestination dest : dests) {
1183                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1184                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1185                    return true;
1186                }
1187            }
1188
1189            return false;
1190        }
1191        return true;
1192    }
1193
1194    /**
1195     * Subscriptions for these destinations are always created
1196     */
1197    protected void setupStaticDestinations() {
1198        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1199        if (dests != null) {
1200            for (ActiveMQDestination dest : dests) {
1201                DemandSubscription sub = createDemandSubscription(dest);
1202                sub.setStaticallyIncluded(true);
1203                try {
1204                    addSubscription(sub);
1205                } catch (IOException e) {
1206                    LOG.error("Failed to add static destination {}", dest, e);
1207                }
1208                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1209            }
1210        }
1211    }
1212
1213    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1214        ConsumerInfo info = consumerInfo.copy();
1215        addRemoteBrokerToBrokerPath(info);
1216        DemandSubscription sub = createDemandSubscription(info);
1217        if (sub != null) {
1218            if (duplicateSuppressionIsRequired(sub)) {
1219                undoMapRegistration(sub);
1220            } else {
1221                if (consumerInfo.isDurable()) {
1222                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1223                }
1224                addSubscription(sub);
1225                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1226            }
1227        }
1228    }
1229
1230    private void undoMapRegistration(DemandSubscription sub) {
1231        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1232        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1233    }
1234
1235    /*
1236     * check our existing subs networkConsumerIds against the list of network
1237     * ids in this subscription A match means a duplicate which we suppress for
1238     * topics and maybe for queues
1239     */
1240    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1241        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1242        boolean suppress = false;
1243
1244        if (isDuplicateSuppressionOff(consumerInfo)) {
1245            return suppress;
1246        }
1247
1248        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1249        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1250        for (Subscription sub : currentSubs) {
1251            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1252            if (!networkConsumers.isEmpty()) {
1253                if (matchFound(candidateConsumers, networkConsumers)) {
1254                    if (isInActiveDurableSub(sub)) {
1255                        suppress = false;
1256                    } else {
1257                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1258                    }
1259                    break;
1260                }
1261            }
1262        }
1263        return suppress;
1264    }
1265
1266    private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
1267        return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
1268                || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
1269                || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
1270    }
1271
1272    private boolean isInActiveDurableSub(Subscription sub) {
1273        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1274    }
1275
1276    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1277        boolean suppress = false;
1278
1279        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1280            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1281                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1282            });
1283            suppress = true;
1284        } else {
1285            // remove the existing lower priority duplicate and allow this candidate
1286            try {
1287                removeDuplicateSubscription(existingSub);
1288
1289                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1290                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1291                });
1292            } catch (IOException e) {
1293                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1294            }
1295        }
1296        return suppress;
1297    }
1298
1299    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1300        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1301            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1302                break;
1303            }
1304        }
1305    }
1306
1307    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1308        boolean found = false;
1309        for (ConsumerId aliasConsumer : networkConsumers) {
1310            if (candidateConsumers.contains(aliasConsumer)) {
1311                found = true;
1312                break;
1313            }
1314        }
1315        return found;
1316    }
1317
1318    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1319        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1320        Region region;
1321        Collection<Subscription> subs;
1322
1323        region = null;
1324        switch (dest.getDestinationType()) {
1325            case ActiveMQDestination.QUEUE_TYPE:
1326                region = region_broker.getQueueRegion();
1327                break;
1328            case ActiveMQDestination.TOPIC_TYPE:
1329                region = region_broker.getTopicRegion();
1330                break;
1331            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1332                region = region_broker.getTempQueueRegion();
1333                break;
1334            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1335                region = region_broker.getTempTopicRegion();
1336                break;
1337        }
1338
1339        if (region instanceof AbstractRegion) {
1340            subs = ((AbstractRegion) region).getSubscriptions().values();
1341        } else {
1342            subs = null;
1343        }
1344
1345        return subs;
1346    }
1347
1348    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1349        // add our original id to ourselves
1350        info.addNetworkConsumerId(info.getConsumerId());
1351        return doCreateDemandSubscription(info);
1352    }
1353
1354    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1355        DemandSubscription result = new DemandSubscription(info);
1356        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1357        if (info.getDestination().isTemporary()) {
1358            // reset the local connection Id
1359            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1360            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1361        }
1362
1363        if (configuration.isDecreaseNetworkConsumerPriority()) {
1364            byte priority = (byte) configuration.getConsumerPriorityBase();
1365            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1366                // The longer the path to the consumer, the less it's consumer priority.
1367                priority -= info.getBrokerPath().length + 1;
1368            }
1369            result.getLocalInfo().setPriority(priority);
1370            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1371        }
1372        configureDemandSubscription(info, result);
1373        return result;
1374    }
1375
1376    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1377        ConsumerInfo info = new ConsumerInfo();
1378        info.setNetworkSubscription(true);
1379        info.setDestination(destination);
1380
1381        // Indicate that this subscription is being made on behalf of the remote broker.
1382        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1383
1384        // the remote info held by the DemandSubscription holds the original
1385        // consumerId, the local info get's overwritten
1386        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1387        DemandSubscription result = null;
1388        try {
1389            result = createDemandSubscription(info);
1390        } catch (IOException e) {
1391            LOG.error("Failed to create DemandSubscription ", e);
1392        }
1393        return result;
1394    }
1395
1396    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1397        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1398                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1399            sub.getLocalInfo().setDispatchAsync(true);
1400        } else {
1401            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1402        }
1403        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1404        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1405        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1406
1407        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1408        if (!info.isDurable()) {
1409            // This works for now since we use a VM connection to the local broker.
1410            // may need to change if we ever subscribe to a remote broker.
1411            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1412        } else {
1413            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1414        }
1415    }
1416
1417    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1418        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1419        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1420                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1421        });
1422        if (sub != null) {
1423            removeSubscription(sub);
1424            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1425                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1426            });
1427        }
1428    }
1429
1430    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1431        boolean removeDone = false;
1432        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1433        if (sub != null) {
1434            try {
1435                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1436                removeDone = true;
1437            } catch (IOException e) {
1438                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1439            }
1440        }
1441        return removeDone;
1442    }
1443
1444    /**
1445     * Performs a timed wait on the started latch and then checks for disposed
1446     * before performing another wait each time the the started wait times out.
1447     */
1448    protected boolean safeWaitUntilStarted() throws InterruptedException {
1449        while (!disposed.get()) {
1450            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1451                break;
1452            }
1453        }
1454        return !disposed.get();
1455    }
1456
1457    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1458        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1459        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1460            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1461            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1462                filterFactory = entry.getNetworkBridgeFilterFactory();
1463            }
1464        }
1465        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1466    }
1467
1468    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1469        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1470    }
1471
1472    protected BrokerId[] getRemoteBrokerPath() {
1473        return remoteBrokerPath;
1474    }
1475
1476    @Override
1477    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1478        this.networkBridgeListener = listener;
1479    }
1480
1481    private void fireBridgeFailed(Throwable reason) {
1482        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1483        NetworkBridgeListener l = this.networkBridgeListener;
1484        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1485            l.bridgeFailed();
1486        }
1487    }
1488
1489    /**
1490     * @return Returns the dynamicallyIncludedDestinations.
1491     */
1492    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1493        return dynamicallyIncludedDestinations;
1494    }
1495
1496    /**
1497     * @param dynamicallyIncludedDestinations
1498     *         The dynamicallyIncludedDestinations to set.
1499     */
1500    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1501        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1502    }
1503
1504    /**
1505     * @return Returns the excludedDestinations.
1506     */
1507    public ActiveMQDestination[] getExcludedDestinations() {
1508        return excludedDestinations;
1509    }
1510
1511    /**
1512     * @param excludedDestinations The excludedDestinations to set.
1513     */
1514    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1515        this.excludedDestinations = excludedDestinations;
1516    }
1517
1518    /**
1519     * @return Returns the staticallyIncludedDestinations.
1520     */
1521    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1522        return staticallyIncludedDestinations;
1523    }
1524
1525    /**
1526     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1527     */
1528    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1529        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1530    }
1531
1532    /**
1533     * @return Returns the durableDestinations.
1534     */
1535    public ActiveMQDestination[] getDurableDestinations() {
1536        return durableDestinations;
1537    }
1538
1539    /**
1540     * @param durableDestinations The durableDestinations to set.
1541     */
1542    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1543        this.durableDestinations = durableDestinations;
1544    }
1545
1546    /**
1547     * @return Returns the localBroker.
1548     */
1549    public Transport getLocalBroker() {
1550        return localBroker;
1551    }
1552
1553    /**
1554     * @return Returns the remoteBroker.
1555     */
1556    public Transport getRemoteBroker() {
1557        return remoteBroker;
1558    }
1559
1560    /**
1561     * @return the createdByDuplex
1562     */
1563    public boolean isCreatedByDuplex() {
1564        return this.createdByDuplex;
1565    }
1566
1567    /**
1568     * @param createdByDuplex the createdByDuplex to set
1569     */
1570    public void setCreatedByDuplex(boolean createdByDuplex) {
1571        this.createdByDuplex = createdByDuplex;
1572    }
1573
1574    @Override
1575    public String getRemoteAddress() {
1576        return remoteBroker.getRemoteAddress();
1577    }
1578
1579    @Override
1580    public String getLocalAddress() {
1581        return localBroker.getRemoteAddress();
1582    }
1583
1584    @Override
1585    public String getRemoteBrokerName() {
1586        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1587    }
1588
1589    @Override
1590    public String getRemoteBrokerId() {
1591        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1592    }
1593
1594    @Override
1595    public String getLocalBrokerName() {
1596        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1597    }
1598
1599    @Override
1600    public long getDequeueCounter() {
1601        return dequeueCounter.get();
1602    }
1603
1604    @Override
1605    public long getEnqueueCounter() {
1606        return enqueueCounter.get();
1607    }
1608
1609    protected boolean isDuplex() {
1610        return configuration.isDuplex() || createdByDuplex;
1611    }
1612
1613    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1614        return subscriptionMapByRemoteId;
1615    }
1616
1617    @Override
1618    public void setBrokerService(BrokerService brokerService) {
1619        this.brokerService = brokerService;
1620        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1621        localBrokerPath[0] = localBrokerId;
1622    }
1623
1624    @Override
1625    public void setMbeanObjectName(ObjectName objectName) {
1626        this.mbeanObjectName = objectName;
1627    }
1628
1629    @Override
1630    public ObjectName getMbeanObjectName() {
1631        return mbeanObjectName;
1632    }
1633
1634    @Override
1635    public void resetStats() {
1636        enqueueCounter.set(0);
1637        dequeueCounter.set(0);
1638    }
1639
1640    /*
1641     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1642     * remote sides of the network bridge.
1643     */
1644    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1645
1646        private final CountDownLatch slot = new CountDownLatch(1);
1647        private final AtomicBoolean disposed;
1648        private volatile BrokerInfo info = null;
1649
1650        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1651            this.info = info;
1652            this.disposed = disposed;
1653        }
1654
1655        @Override
1656        public boolean cancel(boolean mayInterruptIfRunning) {
1657            slot.countDown();
1658            return true;
1659        }
1660
1661        @Override
1662        public boolean isCancelled() {
1663            return slot.getCount() == 0 && info == null;
1664        }
1665
1666        @Override
1667        public boolean isDone() {
1668            return info != null;
1669        }
1670
1671        @Override
1672        public BrokerInfo get() throws InterruptedException, ExecutionException {
1673            try {
1674                if (info == null) {
1675                    while (!disposed.get()) {
1676                        if (slot.await(1, TimeUnit.SECONDS)) {
1677                            break;
1678                        }
1679                    }
1680                }
1681                return info;
1682            } catch (InterruptedException e) {
1683                Thread.currentThread().interrupt();
1684                LOG.debug("Operation interrupted: {}", e, e);
1685                throw new InterruptedException("Interrupted.");
1686            }
1687        }
1688
1689        @Override
1690        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1691            try {
1692                if (info == null) {
1693                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1694
1695                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1696                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1697                            break;
1698                        }
1699                    }
1700                    if (info == null) {
1701                        throw new TimeoutException();
1702                    }
1703                }
1704                return info;
1705            } catch (InterruptedException e) {
1706                throw new InterruptedException("Interrupted.");
1707            }
1708        }
1709
1710        public void set(BrokerInfo info) {
1711            this.info = info;
1712            this.slot.countDown();
1713        }
1714    }
1715
1716    protected void serviceOutbound(Message message) {
1717        NetworkBridgeListener l = this.networkBridgeListener;
1718        if (l != null) {
1719            l.onOutboundMessage(this, message);
1720        }
1721    }
1722
1723    protected void serviceInboundMessage(Message message) {
1724        NetworkBridgeListener l = this.networkBridgeListener;
1725        if (l != null) {
1726            l.onInboundMessage(this, message);
1727        }
1728    }
1729
1730    protected boolean canDuplexDispatch(Message message) {
1731        boolean result = true;
1732        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1733            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1734            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1735            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1736            if (producerSequenceId <= lastStoredForMessageProducer) {
1737                result = false;
1738                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1739                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1740                });
1741            }
1742        }
1743        return result;
1744    }
1745
1746    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1747        try {
1748            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1749        } catch (IOException ignored) {
1750            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1751        }
1752        return -1;
1753    }
1754
1755}