001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038
039import javax.management.ObjectName;
040
041import org.apache.activemq.DestinationDoesNotExistException;
042import org.apache.activemq.Service;
043import org.apache.activemq.advisory.AdvisoryBroker;
044import org.apache.activemq.advisory.AdvisorySupport;
045import org.apache.activemq.broker.BrokerService;
046import org.apache.activemq.broker.BrokerServiceAware;
047import org.apache.activemq.broker.ConnectionContext;
048import org.apache.activemq.broker.TransportConnection;
049import org.apache.activemq.broker.region.AbstractRegion;
050import org.apache.activemq.broker.region.DurableTopicSubscription;
051import org.apache.activemq.broker.region.Region;
052import org.apache.activemq.broker.region.RegionBroker;
053import org.apache.activemq.broker.region.Subscription;
054import org.apache.activemq.broker.region.policy.PolicyEntry;
055import org.apache.activemq.command.ActiveMQDestination;
056import org.apache.activemq.command.ActiveMQMessage;
057import org.apache.activemq.command.ActiveMQTempDestination;
058import org.apache.activemq.command.ActiveMQTopic;
059import org.apache.activemq.command.BrokerId;
060import org.apache.activemq.command.BrokerInfo;
061import org.apache.activemq.command.Command;
062import org.apache.activemq.command.ConnectionError;
063import org.apache.activemq.command.ConnectionId;
064import org.apache.activemq.command.ConnectionInfo;
065import org.apache.activemq.command.ConsumerId;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.DataStructure;
068import org.apache.activemq.command.DestinationInfo;
069import org.apache.activemq.command.ExceptionResponse;
070import org.apache.activemq.command.KeepAliveInfo;
071import org.apache.activemq.command.Message;
072import org.apache.activemq.command.MessageAck;
073import org.apache.activemq.command.MessageDispatch;
074import org.apache.activemq.command.MessageId;
075import org.apache.activemq.command.NetworkBridgeFilter;
076import org.apache.activemq.command.ProducerInfo;
077import org.apache.activemq.command.RemoveInfo;
078import org.apache.activemq.command.RemoveSubscriptionInfo;
079import org.apache.activemq.command.Response;
080import org.apache.activemq.command.SessionInfo;
081import org.apache.activemq.command.ShutdownInfo;
082import org.apache.activemq.command.SubscriptionInfo;
083import org.apache.activemq.command.WireFormatInfo;
084import org.apache.activemq.filter.DestinationFilter;
085import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
086import org.apache.activemq.security.SecurityContext;
087import org.apache.activemq.transport.DefaultTransportListener;
088import org.apache.activemq.transport.FutureResponse;
089import org.apache.activemq.transport.ResponseCallback;
090import org.apache.activemq.transport.Transport;
091import org.apache.activemq.transport.TransportDisposedIOException;
092import org.apache.activemq.transport.TransportFilter;
093import org.apache.activemq.transport.nio.NIOSSLTransport;
094import org.apache.activemq.transport.failover.FailoverTransport;
095import org.apache.activemq.transport.tcp.SslTransport;
096import org.apache.activemq.util.IdGenerator;
097import org.apache.activemq.util.IntrospectionSupport;
098import org.apache.activemq.util.LongSequenceGenerator;
099import org.apache.activemq.util.MarshallingSupport;
100import org.apache.activemq.util.ServiceStopper;
101import org.apache.activemq.util.ServiceSupport;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105/**
106 * A useful base class for implementing demand forwarding bridges.
107 */
108public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
109    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
110    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
111    protected final Transport localBroker;
112    protected final Transport remoteBroker;
113    protected IdGenerator idGenerator = new IdGenerator();
114    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
115    protected ConnectionInfo localConnectionInfo;
116    protected ConnectionInfo remoteConnectionInfo;
117    protected SessionInfo localSessionInfo;
118    protected ProducerInfo producerInfo;
119    protected String remoteBrokerName = "Unknown";
120    protected String localClientId;
121    protected ConsumerInfo demandConsumerInfo;
122    protected int demandConsumerDispatched;
123    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
124    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
125    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
126    protected final AtomicBoolean disposed = new AtomicBoolean();
127    protected BrokerId localBrokerId;
128    protected ActiveMQDestination[] excludedDestinations;
129    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
130    protected ActiveMQDestination[] staticallyIncludedDestinations;
131    protected ActiveMQDestination[] durableDestinations;
132    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
133    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
134    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
135    protected final CountDownLatch startedLatch = new CountDownLatch(2);
136    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
137    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
138    protected NetworkBridgeConfiguration configuration;
139    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
140
141    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
142    protected BrokerId remoteBrokerId;
143
144    final AtomicLong enqueueCounter = new AtomicLong();
145    final AtomicLong dequeueCounter = new AtomicLong();
146
147    private NetworkBridgeListener networkBridgeListener;
148    private boolean createdByDuplex;
149    private BrokerInfo localBrokerInfo;
150    private BrokerInfo remoteBrokerInfo;
151
152    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
153    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
154
155    private final AtomicBoolean started = new AtomicBoolean();
156    private TransportConnection duplexInitiatingConnection;
157    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
158    protected BrokerService brokerService = null;
159    private ObjectName mbeanObjectName;
160    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
161    private Transport duplexInboundLocalBroker = null;
162    private ProducerInfo duplexInboundLocalProducerInfo;
163
164    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
165        this.configuration = configuration;
166        this.localBroker = localBroker;
167        this.remoteBroker = remoteBroker;
168    }
169
170    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
171        this.localBrokerInfo = localBrokerInfo;
172        this.remoteBrokerInfo = remoteBrokerInfo;
173        this.duplexInitiatingConnection = connection;
174        start();
175        serviceRemoteCommand(remoteBrokerInfo);
176    }
177
178    @Override
179    public void start() throws Exception {
180        if (started.compareAndSet(false, true)) {
181
182            if (brokerService == null) {
183                throw new IllegalArgumentException("BrokerService is null on " + this);
184            }
185
186            if (isDuplex()) {
187                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
188                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
189
190                    @Override
191                    public void onCommand(Object o) {
192                        Command command = (Command) o;
193                        serviceLocalCommand(command);
194                    }
195
196                    @Override
197                    public void onException(IOException error) {
198                        serviceLocalException(error);
199                    }
200                });
201                duplexInboundLocalBroker.start();
202            }
203
204            localBroker.setTransportListener(new DefaultTransportListener() {
205
206                @Override
207                public void onCommand(Object o) {
208                    Command command = (Command) o;
209                    serviceLocalCommand(command);
210                }
211
212                @Override
213                public void onException(IOException error) {
214                    if (!futureLocalBrokerInfo.isDone()) {
215                        LOG.info("error with pending local brokerInfo on: " + localBroker, error);
216                        futureLocalBrokerInfo.cancel(true);
217                        return;
218                    }
219                    serviceLocalException(error);
220                }
221            });
222
223            remoteBroker.setTransportListener(new DefaultTransportListener() {
224
225                @Override
226                public void onCommand(Object o) {
227                    Command command = (Command) o;
228                    serviceRemoteCommand(command);
229                }
230
231                @Override
232                public void onException(IOException error) {
233                    if (!futureRemoteBrokerInfo.isDone()) {
234                        LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
235                        futureRemoteBrokerInfo.cancel(true);
236                        return;
237                    }
238                    serviceRemoteException(error);
239                }
240            });
241
242            remoteBroker.start();
243            localBroker.start();
244
245            if (!disposed.get()) {
246                try {
247                    triggerStartAsyncNetworkBridgeCreation();
248                } catch (IOException e) {
249                    LOG.warn("Caught exception from remote start", e);
250                }
251            } else {
252                LOG.warn("Bridge was disposed before the start() method was fully executed.");
253                throw new TransportDisposedIOException();
254            }
255        }
256    }
257
258    @Override
259    public void stop() throws Exception {
260        if (started.compareAndSet(true, false)) {
261            if (disposed.compareAndSet(false, true)) {
262                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
263
264                futureRemoteBrokerInfo.cancel(true);
265                futureLocalBrokerInfo.cancel(true);
266
267                NetworkBridgeListener l = this.networkBridgeListener;
268                if (l != null) {
269                    l.onStop(this);
270                }
271                try {
272                    // local start complete
273                    if (startedLatch.getCount() < 2) {
274                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
275                                configuration.getBrokerName(), this, remoteBrokerName
276                        });
277                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
278                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
279                    }
280
281                    remoteBridgeStarted.set(false);
282                    final CountDownLatch sendShutdown = new CountDownLatch(1);
283
284                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
285                        @Override
286                        public void run() {
287                            try {
288                                serialExecutor.shutdown();
289                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
290                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
291                                    LOG.info("pending tasks on stop {}", pendingTasks);
292                                }
293                                localBroker.oneway(new ShutdownInfo());
294                                remoteBroker.oneway(new ShutdownInfo());
295                            } catch (Throwable e) {
296                                LOG.debug("Caught exception sending shutdown", e);
297                            } finally {
298                                sendShutdown.countDown();
299                            }
300
301                        }
302                    }, "ActiveMQ ForwardingBridge StopTask");
303
304                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
305                        LOG.info("Network Could not shutdown in a timely manner");
306                    }
307                } finally {
308                    ServiceStopper ss = new ServiceStopper();
309                    stopFailoverTransport(remoteBroker);
310                    ss.stop(remoteBroker);
311                    ss.stop(localBroker);
312                    ss.stop(duplexInboundLocalBroker);
313                    // Release the started Latch since another thread could be
314                    // stuck waiting for it to start up.
315                    startedLatch.countDown();
316                    startedLatch.countDown();
317                    localStartedLatch.countDown();
318
319                    ss.throwFirstException();
320                }
321            }
322
323            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
324        }
325    }
326
327    private void stopFailoverTransport(Transport transport) {
328        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
329        if (failoverTransport != null) {
330            // may be blocked on write, in which case stop will block
331            try {
332                failoverTransport.handleTransportFailure(new IOException("Bridge stopped"));
333            } catch (InterruptedException ignored) {}
334        }
335    }
336
337    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
338        brokerService.getTaskRunnerFactory().execute(new Runnable() {
339            @Override
340            public void run() {
341                final String originalName = Thread.currentThread().getName();
342                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
343                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
344
345                try {
346                    // First we collect the info data from both the local and remote ends
347                    collectBrokerInfos();
348
349                    // Once we have all required broker info we can attempt to start
350                    // the local and then remote sides of the bridge.
351                    doStartLocalAndRemoteBridges();
352                } finally {
353                    Thread.currentThread().setName(originalName);
354                }
355            }
356        });
357    }
358
359    private void collectBrokerInfos() {
360
361        // First wait for the remote to feed us its BrokerInfo, then we can check on
362        // the LocalBrokerInfo and decide is this is a loop.
363        try {
364            remoteBrokerInfo = futureRemoteBrokerInfo.get();
365            if (remoteBrokerInfo == null) {
366                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
367                return;
368            }
369        } catch (Exception e) {
370            serviceRemoteException(e);
371            return;
372        }
373
374        try {
375            localBrokerInfo = futureLocalBrokerInfo.get();
376            if (localBrokerInfo == null) {
377                serviceLocalException(new Throwable("localBrokerInfo is null"));
378                return;
379            }
380
381            // Before we try and build the bridge lets check if we are in a loop
382            // and if so just stop now before registering anything.
383            remoteBrokerId = remoteBrokerInfo.getBrokerId();
384            if (localBrokerId.equals(remoteBrokerId)) {
385                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
386                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
387                });
388                ServiceSupport.dispose(localBroker);
389                ServiceSupport.dispose(remoteBroker);
390                // the bridge is left in a bit of limbo, but it won't get retried
391                // in this state.
392                return;
393            }
394
395            // Fill in the remote broker's information now.
396            remoteBrokerPath[0] = remoteBrokerId;
397            remoteBrokerName = remoteBrokerInfo.getBrokerName();
398            if (configuration.isUseBrokerNamesAsIdSeed()) {
399                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
400            }
401        } catch (Throwable e) {
402            serviceLocalException(e);
403        }
404    }
405
406    private void doStartLocalAndRemoteBridges() {
407
408        if (disposed.get()) {
409            return;
410        }
411
412        if (isCreatedByDuplex()) {
413            // apply remote (propagated) configuration to local duplex bridge before start
414            Properties props = null;
415            try {
416                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
417                IntrospectionSupport.getProperties(configuration, props, null);
418                if (configuration.getExcludedDestinations() != null) {
419                    excludedDestinations = configuration.getExcludedDestinations().toArray(
420                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
421                }
422                if (configuration.getStaticallyIncludedDestinations() != null) {
423                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
424                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
425                }
426                if (configuration.getDynamicallyIncludedDestinations() != null) {
427                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
428                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
429                }
430            } catch (Throwable t) {
431                LOG.error("Error mapping remote configuration: {}", props, t);
432            }
433        }
434
435        try {
436            startLocalBridge();
437        } catch (Throwable e) {
438            serviceLocalException(e);
439            return;
440        }
441
442        try {
443            startRemoteBridge();
444        } catch (Throwable e) {
445            serviceRemoteException(e);
446            return;
447        }
448
449        try {
450            if (safeWaitUntilStarted()) {
451                setupStaticDestinations();
452            }
453        } catch (Throwable e) {
454            serviceLocalException(e);
455        }
456    }
457
458    private void startLocalBridge() throws Throwable {
459        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
460            synchronized (this) {
461                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
462                if (!disposed.get()) {
463
464                    if (idGenerator == null) {
465                        throw new IllegalStateException("Id Generator cannot be null");
466                    }
467
468                    localConnectionInfo = new ConnectionInfo();
469                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
470                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
471                    localConnectionInfo.setClientId(localClientId);
472                    localConnectionInfo.setUserName(configuration.getUserName());
473                    localConnectionInfo.setPassword(configuration.getPassword());
474                    Transport originalTransport = remoteBroker;
475                    while (originalTransport instanceof TransportFilter) {
476                        originalTransport = ((TransportFilter) originalTransport).getNext();
477                    }
478                    setTransportContext(originalTransport, localConnectionInfo);
479                    // sync requests that may fail
480                    Object resp = localBroker.request(localConnectionInfo);
481                    if (resp instanceof ExceptionResponse) {
482                        throw ((ExceptionResponse) resp).getException();
483                    }
484                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
485                    localBroker.oneway(localSessionInfo);
486
487                    if (configuration.isDuplex()) {
488                        // separate in-bound channel for forwards so we don't
489                        // contend with out-bound dispatch on same connection
490                        remoteBrokerInfo.setNetworkConnection(true);
491                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
492
493                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
494                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
495                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
496                                + configuration.getBrokerName());
497                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
498                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
499
500                        setTransportContext(originalTransport, duplexLocalConnectionInfo);
501
502                        // sync requests that may fail
503                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
504                        if (resp instanceof ExceptionResponse) {
505                            throw ((ExceptionResponse) resp).getException();
506                        }
507                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
508                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
509                        duplexInboundLocalBroker.oneway(duplexInboundSession);
510                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
511                    }
512                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
513                    NetworkBridgeListener l = this.networkBridgeListener;
514                    if (l != null) {
515                        l.onStart(this);
516                    }
517
518                    // Let the local broker know the remote broker's ID.
519                    localBroker.oneway(remoteBrokerInfo);
520                    // new peer broker (a consumer can work with remote broker also)
521                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
522
523                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
524                            localBroker, remoteBroker, remoteBrokerName
525                    });
526                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
527                            configuration.getBrokerName(), this, remoteBrokerName
528                    });
529                } else {
530                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
531                }
532                startedLatch.countDown();
533                localStartedLatch.countDown();
534            }
535        }
536    }
537
538    private void setTransportContext(Transport transport, ConnectionInfo connectionInfo) {
539        if (transport instanceof SslTransport) {
540            connectionInfo.setTransportContext(((SslTransport)transport).getPeerCertificates());
541        } else if (transport instanceof NIOSSLTransport) {
542            connectionInfo.setTransportContext(((NIOSSLTransport)transport).getPeerCertificates());
543        }
544    }
545
546    protected void startRemoteBridge() throws Exception {
547        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
548            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
549            synchronized (this) {
550                if (!isCreatedByDuplex()) {
551                    BrokerInfo brokerInfo = new BrokerInfo();
552                    brokerInfo.setBrokerName(configuration.getBrokerName());
553                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
554                    brokerInfo.setNetworkConnection(true);
555                    brokerInfo.setDuplexConnection(configuration.isDuplex());
556                    // set our properties
557                    Properties props = new Properties();
558                    IntrospectionSupport.getProperties(configuration, props, null);
559                    props.remove("networkTTL");
560                    String str = MarshallingSupport.propertiesToString(props);
561                    brokerInfo.setNetworkProperties(str);
562                    brokerInfo.setBrokerId(this.localBrokerId);
563                    remoteBroker.oneway(brokerInfo);
564                }
565                if (remoteConnectionInfo != null) {
566                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
567                }
568                remoteConnectionInfo = new ConnectionInfo();
569                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
570                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
571                remoteConnectionInfo.setUserName(configuration.getUserName());
572                remoteConnectionInfo.setPassword(configuration.getPassword());
573                remoteBroker.oneway(remoteConnectionInfo);
574
575                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
576                remoteBroker.oneway(remoteSessionInfo);
577                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
578                producerInfo.setResponseRequired(false);
579                remoteBroker.oneway(producerInfo);
580                // Listen to consumer advisory messages on the remote broker to determine demand.
581                if (!configuration.isStaticBridge()) {
582                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
583                    // always dispatch advisory message asynchronously so that
584                    // we never block the producer broker if we are slow
585                    demandConsumerInfo.setDispatchAsync(true);
586                    String advisoryTopic = configuration.getDestinationFilter();
587                    if (configuration.isBridgeTempDestinations()) {
588                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
589                    }
590                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
591                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
592                    remoteBroker.oneway(demandConsumerInfo);
593                }
594                startedLatch.countDown();
595            }
596        }
597    }
598
599    @Override
600    public void serviceRemoteException(Throwable error) {
601        if (!disposed.get()) {
602            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
603                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
604                        localBroker, remoteBroker, error
605                });
606            } else {
607                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
608                        localBroker, remoteBroker, error
609                });
610            }
611            LOG.debug("The remote Exception was: {}", error, error);
612            brokerService.getTaskRunnerFactory().execute(new Runnable() {
613                @Override
614                public void run() {
615                    ServiceSupport.dispose(getControllingService());
616                }
617            });
618            fireBridgeFailed(error);
619        }
620    }
621
622    protected void serviceRemoteCommand(Command command) {
623        if (!disposed.get()) {
624            try {
625                if (command.isMessageDispatch()) {
626                    safeWaitUntilStarted();
627                    MessageDispatch md = (MessageDispatch) command;
628                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
629                    ackAdvisory(md.getMessage());
630                } else if (command.isBrokerInfo()) {
631                    futureRemoteBrokerInfo.set((BrokerInfo) command);
632                } else if (command.getClass() == ConnectionError.class) {
633                    ConnectionError ce = (ConnectionError) command;
634                    serviceRemoteException(ce.getException());
635                } else {
636                    if (isDuplex()) {
637                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
638                        if (command.isMessage()) {
639                            final ActiveMQMessage message = (ActiveMQMessage) command;
640                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
641                                serviceRemoteConsumerAdvisory(message.getDataStructure());
642                                ackAdvisory(message);
643                            } else {
644                                if (!isPermissableDestination(message.getDestination(), true)) {
645                                    return;
646                                }
647                                safeWaitUntilStarted();
648                                // message being forwarded - we need to
649                                // propagate the response to our local send
650                                if (canDuplexDispatch(message)) {
651                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
652                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
653                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
654                                            final int correlationId = message.getCommandId();
655
656                                            @Override
657                                            public void onCompletion(FutureResponse resp) {
658                                                try {
659                                                    Response reply = resp.getResult();
660                                                    reply.setCorrelationId(correlationId);
661                                                    remoteBroker.oneway(reply);
662                                                } catch (IOException error) {
663                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
664                                                    serviceRemoteException(error);
665                                                }
666                                            }
667                                        });
668                                    } else {
669                                        duplexInboundLocalBroker.oneway(message);
670                                    }
671                                    serviceInboundMessage(message);
672                                } else {
673                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
674                                        Response reply = new Response();
675                                        reply.setCorrelationId(message.getCommandId());
676                                        remoteBroker.oneway(reply);
677                                    }
678                                }
679                            }
680                        } else {
681                            switch (command.getDataStructureType()) {
682                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
683                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
684                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
685                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
686                                    } else {
687                                        localBroker.oneway(command);
688                                    }
689                                    break;
690                                case SessionInfo.DATA_STRUCTURE_TYPE:
691                                    localBroker.oneway(command);
692                                    break;
693                                case ProducerInfo.DATA_STRUCTURE_TYPE:
694                                    // using duplexInboundLocalProducerInfo
695                                    break;
696                                case MessageAck.DATA_STRUCTURE_TYPE:
697                                    MessageAck ack = (MessageAck) command;
698                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
699                                    if (localSub != null) {
700                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
701                                        localBroker.oneway(ack);
702                                    } else {
703                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
704                                    }
705                                    break;
706                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
707                                    localStartedLatch.await();
708                                    if (started.get()) {
709                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
710                                        if (isDuplicateSuppressionOff(consumerInfo)) {
711                                            addConsumerInfo(consumerInfo);
712                                        } else {
713                                            synchronized (brokerService.getVmConnectorURI()) {
714                                                addConsumerInfo(consumerInfo);
715                                            }
716                                        }
717                                    } else {
718                                        // received a subscription whilst stopping
719                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
720                                    }
721                                    break;
722                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
723                                    // initiator is shutting down, controlled case
724                                    // abortive close dealt with by inactivity monitor
725                                    LOG.info("Stopping network bridge on shutdown of remote broker");
726                                    serviceRemoteException(new IOException(command.toString()));
727                                    break;
728                                default:
729                                    LOG.debug("Ignoring remote command: {}", command);
730                            }
731                        }
732                    } else {
733                        switch (command.getDataStructureType()) {
734                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
735                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
736                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
737                                break;
738                            default:
739                                LOG.warn("Unexpected remote command: {}", command);
740                        }
741                    }
742                }
743            } catch (Throwable e) {
744                LOG.debug("Exception processing remote command: {}", command, e);
745                serviceRemoteException(e);
746            }
747        }
748    }
749
750    private void ackAdvisory(Message message) throws IOException {
751        demandConsumerDispatched++;
752        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
753            final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
754            ack.setConsumerId(demandConsumerInfo.getConsumerId());
755            brokerService.getTaskRunnerFactory().execute(new Runnable() {
756                @Override
757                public void run() {
758                    try {
759                        remoteBroker.oneway(ack);
760                    } catch (IOException e) {
761                        LOG.warn("Failed to send advisory ack " + ack, e);
762                    }
763                }
764            });
765            demandConsumerDispatched = 0;
766        }
767    }
768
769    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
770        final int networkTTL = configuration.getConsumerTTL();
771        if (data.getClass() == ConsumerInfo.class) {
772            // Create a new local subscription
773            ConsumerInfo info = (ConsumerInfo) data;
774            BrokerId[] path = info.getBrokerPath();
775
776            if (info.isBrowser()) {
777                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
778                return;
779            }
780
781            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
782                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
783                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
784                });
785                return;
786            }
787
788            if (contains(path, localBrokerPath[0])) {
789                // Ignore this consumer as it's a consumer we locally sent to the broker.
790                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
791                        configuration.getBrokerName(), remoteBrokerName, info
792                });
793                return;
794            }
795
796            if (!isPermissableDestination(info.getDestination())) {
797                // ignore if not in the permitted or in the excluded list
798                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
799                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
800                });
801                return;
802            }
803
804            // in a cyclic network there can be multiple bridges per broker that can propagate
805            // a network subscription so there is a need to synchronize on a shared entity
806            // if duplicate suppression is required
807            if (isDuplicateSuppressionOff(info)) {
808                addConsumerInfo(info);
809            } else {
810                synchronized (brokerService.getVmConnectorURI()) {
811                    addConsumerInfo(info);
812                }
813            }
814        } else if (data.getClass() == DestinationInfo.class) {
815            final DestinationInfo destInfo = (DestinationInfo) data;
816            BrokerId[] path = destInfo.getBrokerPath();
817            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
818                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
819                        configuration.getBrokerName(), destInfo, networkTTL
820                });
821                return;
822            }
823            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
824                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
825                return;
826            }
827            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
828            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
829                // re-set connection id so comes from here
830                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
831                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
832            }
833            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
834            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
835                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
836            });
837            if (destInfo.isRemoveOperation()) {
838                // not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0
839                destInfo.setTimeout(1);
840            }
841            // Serialize both add/remove dest with removeSub operations such that all removeSub advisories are generated
842            serialExecutor.execute(new Runnable() {
843                @Override
844                public void run() {
845                    try {
846                        localBroker.oneway(destInfo);
847                    } catch (IOException e) {
848                        LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
849                    }
850                }
851            });
852
853        } else if (data.getClass() == RemoveInfo.class) {
854            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
855            removeDemandSubscription(id);
856        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
857            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
858            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
859            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
860                DemandSubscription ds = i.next();
861                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
862                if (removed) {
863                    if (ds.getDurableRemoteSubs().isEmpty()) {
864
865                        // deactivate subscriber
866                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
867                        localBroker.oneway(removeInfo);
868
869                        // remove subscriber
870                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
871                        sending.setClientId(localClientId);
872                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
873                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
874                        localBroker.oneway(sending);
875                    }
876                }
877            }
878        }
879    }
880
881    @Override
882    public void serviceLocalException(Throwable error) {
883        serviceLocalException(null, error);
884    }
885
886    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
887        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
888        if (!disposed.get()) {
889            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
890                // not a reason to terminate the bridge - temps can disappear with
891                // pending sends as the demand sub may outlive the remote dest
892                if (messageDispatch != null) {
893                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
894                    try {
895                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
896                        poisonAck.setPoisonCause(error);
897                        localBroker.oneway(poisonAck);
898                    } catch (IOException ioe) {
899                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
900                    }
901                    fireFailedForwardAdvisory(messageDispatch, error);
902                } else {
903                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
904                }
905                return;
906            }
907
908            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
909            LOG.debug("The local Exception was: {}", error, error);
910
911            brokerService.getTaskRunnerFactory().execute(new Runnable() {
912                @Override
913                public void run() {
914                    ServiceSupport.dispose(getControllingService());
915                }
916            });
917            fireBridgeFailed(error);
918        }
919    }
920
921    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
922        if (configuration.isAdvisoryForFailedForward()) {
923            AdvisoryBroker advisoryBroker = null;
924            try {
925                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
926
927                if (advisoryBroker != null) {
928                    ConnectionContext context = new ConnectionContext();
929                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
930                    context.setBroker(brokerService.getBroker());
931
932                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
933                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
934                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
935                            advisoryMessage);
936
937                }
938            } catch (Exception e) {
939                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
940                LOG.debug("detail", e);
941            }
942        }
943    }
944
945    protected Service getControllingService() {
946        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
947    }
948
949    protected void addSubscription(DemandSubscription sub) throws IOException {
950        if (sub != null) {
951            localBroker.oneway(sub.getLocalInfo());
952        }
953    }
954
955    protected void removeSubscription(final DemandSubscription sub) throws IOException {
956        if (sub != null) {
957            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
958
959            // ensure not available for conduit subs pending removal
960            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
961            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
962
963            // continue removal in separate thread to free up this thread for outstanding responses
964            // Serialize with removeDestination operations so that removeSubs are serialized with
965            // removeDestinations such that all removeSub advisories are generated
966            serialExecutor.execute(new Runnable() {
967                @Override
968                public void run() {
969                    sub.waitForCompletion();
970                    try {
971                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
972                    } catch (IOException e) {
973                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
974                    }
975                }
976            });
977        }
978    }
979
980    protected Message configureMessage(MessageDispatch md) throws IOException {
981        Message message = md.getMessage().copy();
982        // Update the packet to show where it came from.
983        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
984        message.setProducerId(producerInfo.getProducerId());
985        message.setDestination(md.getDestination());
986        message.setMemoryUsage(null);
987        if (message.getOriginalTransactionId() == null) {
988            message.setOriginalTransactionId(message.getTransactionId());
989        }
990        message.setTransactionId(null);
991        if (configuration.isUseCompression()) {
992            message.compress();
993        }
994        return message;
995    }
996
997    protected void serviceLocalCommand(Command command) {
998        if (!disposed.get()) {
999            try {
1000                if (command.isMessageDispatch()) {
1001                    safeWaitUntilStarted();
1002                    enqueueCounter.incrementAndGet();
1003                    final MessageDispatch md = (MessageDispatch) command;
1004                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1005                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1006
1007                        if (suppressMessageDispatch(md, sub)) {
1008                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1009                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1010                            });
1011                            // still ack as it may be durable
1012                            try {
1013                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1014                            } finally {
1015                                sub.decrementOutstandingResponses();
1016                            }
1017                            return;
1018                        }
1019
1020                        Message message = configureMessage(md);
1021                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1022                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1023                        });
1024                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1025                            try {
1026                                // never request b/c they are eventually acked async
1027                                remoteBroker.oneway(message);
1028                            } finally {
1029                                sub.decrementOutstandingResponses();
1030                            }
1031                            return;
1032                        }
1033
1034                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1035
1036                            // The message was not sent using async send, so we should only
1037                            // ack the local broker when we get confirmation that the remote
1038                            // broker has received the message.
1039                            remoteBroker.asyncRequest(message, new ResponseCallback() {
1040                                @Override
1041                                public void onCompletion(FutureResponse future) {
1042                                    try {
1043                                        Response response = future.getResult();
1044                                        if (response.isException()) {
1045                                            ExceptionResponse er = (ExceptionResponse) response;
1046                                            serviceLocalException(md, er.getException());
1047                                        } else {
1048                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1049                                            dequeueCounter.incrementAndGet();
1050                                        }
1051                                    } catch (IOException e) {
1052                                        serviceLocalException(md, e);
1053                                    } finally {
1054                                        sub.decrementOutstandingResponses();
1055                                    }
1056                                }
1057                            });
1058
1059                        } else {
1060                            // If the message was originally sent using async send, we will
1061                            // preserve that QOS by bridging it using an async send (small chance
1062                            // of message loss).
1063                            try {
1064                                remoteBroker.oneway(message);
1065                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1066                                dequeueCounter.incrementAndGet();
1067                            } finally {
1068                                sub.decrementOutstandingResponses();
1069                            }
1070                        }
1071                        serviceOutbound(message);
1072                    } else {
1073                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1074                    }
1075                } else if (command.isBrokerInfo()) {
1076                    futureLocalBrokerInfo.set((BrokerInfo) command);
1077                } else if (command.isShutdownInfo()) {
1078                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1079                    stop();
1080                } else if (command.getClass() == ConnectionError.class) {
1081                    ConnectionError ce = (ConnectionError) command;
1082                    serviceLocalException(ce.getException());
1083                } else {
1084                    switch (command.getDataStructureType()) {
1085                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1086                            break;
1087                        default:
1088                            LOG.warn("Unexpected local command: {}", command);
1089                    }
1090                }
1091            } catch (Throwable e) {
1092                LOG.warn("Caught an exception processing local command", e);
1093                serviceLocalException(e);
1094            }
1095        }
1096    }
1097
1098    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1099        boolean suppress = false;
1100        // for durable subs, suppression via filter leaves dangling acks so we
1101        // need to check here and allow the ack irrespective
1102        if (sub.getLocalInfo().isDurable()) {
1103            NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext();
1104            messageEvalContext.setMessageReference(md.getMessage());
1105            messageEvalContext.setDestination(md.getDestination());
1106            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1107        }
1108        return suppress;
1109    }
1110
1111    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1112        if (brokerPath != null) {
1113            for (BrokerId id : brokerPath) {
1114                if (brokerId.equals(id)) {
1115                    return true;
1116                }
1117            }
1118        }
1119        return false;
1120    }
1121
1122    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1123        if (brokerPath == null || brokerPath.length == 0) {
1124            return pathsToAppend;
1125        }
1126        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1127        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1128        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1129        return rc;
1130    }
1131
1132    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1133        if (brokerPath == null || brokerPath.length == 0) {
1134            return new BrokerId[]{idToAppend};
1135        }
1136        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1137        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1138        rc[brokerPath.length] = idToAppend;
1139        return rc;
1140    }
1141
1142    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1143        return isPermissableDestination(destination, false);
1144    }
1145
1146    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1147        // Are we not bridging temporary destinations?
1148        if (destination.isTemporary()) {
1149            if (allowTemporary) {
1150                return true;
1151            } else {
1152                return configuration.isBridgeTempDestinations();
1153            }
1154        }
1155
1156        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1157        if (dests != null && dests.length > 0) {
1158            for (ActiveMQDestination dest : dests) {
1159                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1160                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1161                    return true;
1162                }
1163            }
1164        }
1165
1166        dests = excludedDestinations;
1167        if (dests != null && dests.length > 0) {
1168            for (ActiveMQDestination dest : dests) {
1169                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1170                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1171                    return false;
1172                }
1173            }
1174        }
1175
1176        dests = dynamicallyIncludedDestinations;
1177        if (dests != null && dests.length > 0) {
1178            for (ActiveMQDestination dest : dests) {
1179                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1180                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1181                    return true;
1182                }
1183            }
1184
1185            return false;
1186        }
1187        return true;
1188    }
1189
1190    /**
1191     * Subscriptions for these destinations are always created
1192     */
1193    protected void setupStaticDestinations() {
1194        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1195        if (dests != null) {
1196            for (ActiveMQDestination dest : dests) {
1197                DemandSubscription sub = createDemandSubscription(dest);
1198                sub.setStaticallyIncluded(true);
1199                try {
1200                    addSubscription(sub);
1201                } catch (IOException e) {
1202                    LOG.error("Failed to add static destination {}", dest, e);
1203                }
1204                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1205            }
1206        }
1207    }
1208
1209    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1210        ConsumerInfo info = consumerInfo.copy();
1211        addRemoteBrokerToBrokerPath(info);
1212        DemandSubscription sub = createDemandSubscription(info);
1213        if (sub != null) {
1214            if (duplicateSuppressionIsRequired(sub)) {
1215                undoMapRegistration(sub);
1216            } else {
1217                if (consumerInfo.isDurable()) {
1218                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1219                }
1220                addSubscription(sub);
1221                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1222            }
1223        }
1224    }
1225
1226    private void undoMapRegistration(DemandSubscription sub) {
1227        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1228        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1229    }
1230
1231    /*
1232     * check our existing subs networkConsumerIds against the list of network
1233     * ids in this subscription A match means a duplicate which we suppress for
1234     * topics and maybe for queues
1235     */
1236    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1237        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1238        boolean suppress = false;
1239
1240        if (isDuplicateSuppressionOff(consumerInfo)) {
1241            return suppress;
1242        }
1243
1244        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1245        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1246        for (Subscription sub : currentSubs) {
1247            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1248            if (!networkConsumers.isEmpty()) {
1249                if (matchFound(candidateConsumers, networkConsumers)) {
1250                    if (isInActiveDurableSub(sub)) {
1251                        suppress = false;
1252                    } else {
1253                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1254                    }
1255                    break;
1256                }
1257            }
1258        }
1259        return suppress;
1260    }
1261
1262    private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
1263        return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
1264                || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
1265                || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
1266    }
1267
1268    private boolean isInActiveDurableSub(Subscription sub) {
1269        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1270    }
1271
1272    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1273        boolean suppress = false;
1274
1275        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1276            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1277                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1278            });
1279            suppress = true;
1280        } else {
1281            // remove the existing lower priority duplicate and allow this candidate
1282            try {
1283                removeDuplicateSubscription(existingSub);
1284
1285                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1286                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1287                });
1288            } catch (IOException e) {
1289                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1290            }
1291        }
1292        return suppress;
1293    }
1294
1295    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1296        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1297            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1298                break;
1299            }
1300        }
1301    }
1302
1303    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1304        boolean found = false;
1305        for (ConsumerId aliasConsumer : networkConsumers) {
1306            if (candidateConsumers.contains(aliasConsumer)) {
1307                found = true;
1308                break;
1309            }
1310        }
1311        return found;
1312    }
1313
1314    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1315        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1316        Region region;
1317        Collection<Subscription> subs;
1318
1319        region = null;
1320        switch (dest.getDestinationType()) {
1321            case ActiveMQDestination.QUEUE_TYPE:
1322                region = region_broker.getQueueRegion();
1323                break;
1324            case ActiveMQDestination.TOPIC_TYPE:
1325                region = region_broker.getTopicRegion();
1326                break;
1327            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1328                region = region_broker.getTempQueueRegion();
1329                break;
1330            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1331                region = region_broker.getTempTopicRegion();
1332                break;
1333        }
1334
1335        if (region instanceof AbstractRegion) {
1336            subs = ((AbstractRegion) region).getSubscriptions().values();
1337        } else {
1338            subs = null;
1339        }
1340
1341        return subs;
1342    }
1343
1344    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1345        // add our original id to ourselves
1346        info.addNetworkConsumerId(info.getConsumerId());
1347        return doCreateDemandSubscription(info);
1348    }
1349
1350    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1351        DemandSubscription result = new DemandSubscription(info);
1352        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1353        if (info.getDestination().isTemporary()) {
1354            // reset the local connection Id
1355            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1356            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1357        }
1358
1359        if (configuration.isDecreaseNetworkConsumerPriority()) {
1360            byte priority = (byte) configuration.getConsumerPriorityBase();
1361            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1362                // The longer the path to the consumer, the less it's consumer priority.
1363                priority -= info.getBrokerPath().length + 1;
1364            }
1365            result.getLocalInfo().setPriority(priority);
1366            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1367        }
1368        configureDemandSubscription(info, result);
1369        return result;
1370    }
1371
1372    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1373        ConsumerInfo info = new ConsumerInfo();
1374        info.setNetworkSubscription(true);
1375        info.setDestination(destination);
1376
1377        // Indicate that this subscription is being made on behalf of the remote broker.
1378        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1379
1380        // the remote info held by the DemandSubscription holds the original
1381        // consumerId, the local info get's overwritten
1382        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1383        DemandSubscription result = null;
1384        try {
1385            result = createDemandSubscription(info);
1386        } catch (IOException e) {
1387            LOG.error("Failed to create DemandSubscription ", e);
1388        }
1389        return result;
1390    }
1391
1392    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1393        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1394                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1395            sub.getLocalInfo().setDispatchAsync(true);
1396        } else {
1397            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1398        }
1399        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1400        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1401        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1402
1403        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1404        if (!info.isDurable()) {
1405            // This works for now since we use a VM connection to the local broker.
1406            // may need to change if we ever subscribe to a remote broker.
1407            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1408        } else {
1409            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1410        }
1411    }
1412
1413    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1414        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1415        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1416                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1417        });
1418        if (sub != null) {
1419            removeSubscription(sub);
1420            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1421                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1422            });
1423        }
1424    }
1425
1426    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1427        boolean removeDone = false;
1428        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1429        if (sub != null) {
1430            try {
1431                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1432                removeDone = true;
1433            } catch (IOException e) {
1434                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1435            }
1436        }
1437        return removeDone;
1438    }
1439
1440    /**
1441     * Performs a timed wait on the started latch and then checks for disposed
1442     * before performing another wait each time the the started wait times out.
1443     */
1444    protected boolean safeWaitUntilStarted() throws InterruptedException {
1445        while (!disposed.get()) {
1446            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1447                break;
1448            }
1449        }
1450        return !disposed.get();
1451    }
1452
1453    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1454        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1455        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1456            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1457            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1458                filterFactory = entry.getNetworkBridgeFilterFactory();
1459            }
1460        }
1461        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1462    }
1463
1464    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1465        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1466    }
1467
1468    protected BrokerId[] getRemoteBrokerPath() {
1469        return remoteBrokerPath;
1470    }
1471
1472    @Override
1473    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1474        this.networkBridgeListener = listener;
1475    }
1476
1477    private void fireBridgeFailed(Throwable reason) {
1478        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1479        NetworkBridgeListener l = this.networkBridgeListener;
1480        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1481            l.bridgeFailed();
1482        }
1483    }
1484
1485    /**
1486     * @return Returns the dynamicallyIncludedDestinations.
1487     */
1488    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1489        return dynamicallyIncludedDestinations;
1490    }
1491
1492    /**
1493     * @param dynamicallyIncludedDestinations
1494     *         The dynamicallyIncludedDestinations to set.
1495     */
1496    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1497        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1498    }
1499
1500    /**
1501     * @return Returns the excludedDestinations.
1502     */
1503    public ActiveMQDestination[] getExcludedDestinations() {
1504        return excludedDestinations;
1505    }
1506
1507    /**
1508     * @param excludedDestinations The excludedDestinations to set.
1509     */
1510    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1511        this.excludedDestinations = excludedDestinations;
1512    }
1513
1514    /**
1515     * @return Returns the staticallyIncludedDestinations.
1516     */
1517    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1518        return staticallyIncludedDestinations;
1519    }
1520
1521    /**
1522     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1523     */
1524    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1525        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1526    }
1527
1528    /**
1529     * @return Returns the durableDestinations.
1530     */
1531    public ActiveMQDestination[] getDurableDestinations() {
1532        return durableDestinations;
1533    }
1534
1535    /**
1536     * @param durableDestinations The durableDestinations to set.
1537     */
1538    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1539        this.durableDestinations = durableDestinations;
1540    }
1541
1542    /**
1543     * @return Returns the localBroker.
1544     */
1545    public Transport getLocalBroker() {
1546        return localBroker;
1547    }
1548
1549    /**
1550     * @return Returns the remoteBroker.
1551     */
1552    public Transport getRemoteBroker() {
1553        return remoteBroker;
1554    }
1555
1556    /**
1557     * @return the createdByDuplex
1558     */
1559    public boolean isCreatedByDuplex() {
1560        return this.createdByDuplex;
1561    }
1562
1563    /**
1564     * @param createdByDuplex the createdByDuplex to set
1565     */
1566    public void setCreatedByDuplex(boolean createdByDuplex) {
1567        this.createdByDuplex = createdByDuplex;
1568    }
1569
1570    @Override
1571    public String getRemoteAddress() {
1572        return remoteBroker.getRemoteAddress();
1573    }
1574
1575    @Override
1576    public String getLocalAddress() {
1577        return localBroker.getRemoteAddress();
1578    }
1579
1580    @Override
1581    public String getRemoteBrokerName() {
1582        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1583    }
1584
1585    @Override
1586    public String getRemoteBrokerId() {
1587        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1588    }
1589
1590    @Override
1591    public String getLocalBrokerName() {
1592        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1593    }
1594
1595    @Override
1596    public long getDequeueCounter() {
1597        return dequeueCounter.get();
1598    }
1599
1600    @Override
1601    public long getEnqueueCounter() {
1602        return enqueueCounter.get();
1603    }
1604
1605    protected boolean isDuplex() {
1606        return configuration.isDuplex() || createdByDuplex;
1607    }
1608
1609    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1610        return subscriptionMapByRemoteId;
1611    }
1612
1613    @Override
1614    public void setBrokerService(BrokerService brokerService) {
1615        this.brokerService = brokerService;
1616        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1617        localBrokerPath[0] = localBrokerId;
1618    }
1619
1620    @Override
1621    public void setMbeanObjectName(ObjectName objectName) {
1622        this.mbeanObjectName = objectName;
1623    }
1624
1625    @Override
1626    public ObjectName getMbeanObjectName() {
1627        return mbeanObjectName;
1628    }
1629
1630    @Override
1631    public void resetStats() {
1632        enqueueCounter.set(0);
1633        dequeueCounter.set(0);
1634    }
1635
1636    /*
1637     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1638     * remote sides of the network bridge.
1639     */
1640    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1641
1642        private final CountDownLatch slot = new CountDownLatch(1);
1643        private final AtomicBoolean disposed;
1644        private volatile BrokerInfo info = null;
1645
1646        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1647            this.info = info;
1648            this.disposed = disposed;
1649        }
1650
1651        @Override
1652        public boolean cancel(boolean mayInterruptIfRunning) {
1653            slot.countDown();
1654            return true;
1655        }
1656
1657        @Override
1658        public boolean isCancelled() {
1659            return slot.getCount() == 0 && info == null;
1660        }
1661
1662        @Override
1663        public boolean isDone() {
1664            return info != null;
1665        }
1666
1667        @Override
1668        public BrokerInfo get() throws InterruptedException, ExecutionException {
1669            try {
1670                if (info == null) {
1671                    while (!disposed.get()) {
1672                        if (slot.await(1, TimeUnit.SECONDS)) {
1673                            break;
1674                        }
1675                    }
1676                }
1677                return info;
1678            } catch (InterruptedException e) {
1679                Thread.currentThread().interrupt();
1680                LOG.debug("Operation interrupted: {}", e, e);
1681                throw new InterruptedException("Interrupted.");
1682            }
1683        }
1684
1685        @Override
1686        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1687            try {
1688                if (info == null) {
1689                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1690
1691                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1692                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1693                            break;
1694                        }
1695                    }
1696                    if (info == null) {
1697                        throw new TimeoutException();
1698                    }
1699                }
1700                return info;
1701            } catch (InterruptedException e) {
1702                throw new InterruptedException("Interrupted.");
1703            }
1704        }
1705
1706        public void set(BrokerInfo info) {
1707            this.info = info;
1708            this.slot.countDown();
1709        }
1710    }
1711
1712    protected void serviceOutbound(Message message) {
1713        NetworkBridgeListener l = this.networkBridgeListener;
1714        if (l != null) {
1715            l.onOutboundMessage(this, message);
1716        }
1717    }
1718
1719    protected void serviceInboundMessage(Message message) {
1720        NetworkBridgeListener l = this.networkBridgeListener;
1721        if (l != null) {
1722            l.onInboundMessage(this, message);
1723        }
1724    }
1725
1726    protected boolean canDuplexDispatch(Message message) {
1727        boolean result = true;
1728        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1729            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1730            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1731            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1732            if (producerSequenceId <= lastStoredForMessageProducer) {
1733                result = false;
1734                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1735                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1736                });
1737            }
1738        }
1739        return result;
1740    }
1741
1742    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1743        try {
1744            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1745        } catch (IOException ignored) {
1746            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1747        }
1748        return -1;
1749    }
1750
1751}