001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038
039import javax.management.ObjectName;
040
041import org.apache.activemq.DestinationDoesNotExistException;
042import org.apache.activemq.Service;
043import org.apache.activemq.advisory.AdvisoryBroker;
044import org.apache.activemq.advisory.AdvisorySupport;
045import org.apache.activemq.broker.BrokerService;
046import org.apache.activemq.broker.BrokerServiceAware;
047import org.apache.activemq.broker.ConnectionContext;
048import org.apache.activemq.broker.TransportConnection;
049import org.apache.activemq.broker.region.AbstractRegion;
050import org.apache.activemq.broker.region.DurableTopicSubscription;
051import org.apache.activemq.broker.region.Region;
052import org.apache.activemq.broker.region.RegionBroker;
053import org.apache.activemq.broker.region.Subscription;
054import org.apache.activemq.broker.region.policy.PolicyEntry;
055import org.apache.activemq.command.ActiveMQDestination;
056import org.apache.activemq.command.ActiveMQMessage;
057import org.apache.activemq.command.ActiveMQTempDestination;
058import org.apache.activemq.command.ActiveMQTopic;
059import org.apache.activemq.command.BrokerId;
060import org.apache.activemq.command.BrokerInfo;
061import org.apache.activemq.command.Command;
062import org.apache.activemq.command.ConnectionError;
063import org.apache.activemq.command.ConnectionId;
064import org.apache.activemq.command.ConnectionInfo;
065import org.apache.activemq.command.ConsumerId;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.DataStructure;
068import org.apache.activemq.command.DestinationInfo;
069import org.apache.activemq.command.ExceptionResponse;
070import org.apache.activemq.command.KeepAliveInfo;
071import org.apache.activemq.command.Message;
072import org.apache.activemq.command.MessageAck;
073import org.apache.activemq.command.MessageDispatch;
074import org.apache.activemq.command.MessageId;
075import org.apache.activemq.command.NetworkBridgeFilter;
076import org.apache.activemq.command.ProducerInfo;
077import org.apache.activemq.command.RemoveInfo;
078import org.apache.activemq.command.RemoveSubscriptionInfo;
079import org.apache.activemq.command.Response;
080import org.apache.activemq.command.SessionInfo;
081import org.apache.activemq.command.ShutdownInfo;
082import org.apache.activemq.command.SubscriptionInfo;
083import org.apache.activemq.command.WireFormatInfo;
084import org.apache.activemq.filter.DestinationFilter;
085import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
086import org.apache.activemq.security.SecurityContext;
087import org.apache.activemq.transport.DefaultTransportListener;
088import org.apache.activemq.transport.FutureResponse;
089import org.apache.activemq.transport.ResponseCallback;
090import org.apache.activemq.transport.Transport;
091import org.apache.activemq.transport.TransportDisposedIOException;
092import org.apache.activemq.transport.TransportFilter;
093import org.apache.activemq.transport.nio.NIOSSLTransport;
094import org.apache.activemq.transport.failover.FailoverTransport;
095import org.apache.activemq.transport.tcp.SslTransport;
096import org.apache.activemq.util.IdGenerator;
097import org.apache.activemq.util.IntrospectionSupport;
098import org.apache.activemq.util.LongSequenceGenerator;
099import org.apache.activemq.util.MarshallingSupport;
100import org.apache.activemq.util.ServiceStopper;
101import org.apache.activemq.util.ServiceSupport;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105/**
106 * A useful base class for implementing demand forwarding bridges.
107 */
108public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
109    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
110    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
111    protected final Transport localBroker;
112    protected final Transport remoteBroker;
113    protected IdGenerator idGenerator = new IdGenerator();
114    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
115    protected ConnectionInfo localConnectionInfo;
116    protected ConnectionInfo remoteConnectionInfo;
117    protected SessionInfo localSessionInfo;
118    protected ProducerInfo producerInfo;
119    protected String remoteBrokerName = "Unknown";
120    protected String localClientId;
121    protected ConsumerInfo demandConsumerInfo;
122    protected int demandConsumerDispatched;
123    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
124    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
125    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
126    protected final AtomicBoolean disposed = new AtomicBoolean();
127    protected BrokerId localBrokerId;
128    protected ActiveMQDestination[] excludedDestinations;
129    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
130    protected ActiveMQDestination[] staticallyIncludedDestinations;
131    protected ActiveMQDestination[] durableDestinations;
132    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
133    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
134    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
135    protected final CountDownLatch startedLatch = new CountDownLatch(2);
136    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
137    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
138    protected NetworkBridgeConfiguration configuration;
139    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
140
141    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
142    protected BrokerId remoteBrokerId;
143
144    final AtomicLong enqueueCounter = new AtomicLong();
145    final AtomicLong dequeueCounter = new AtomicLong();
146
147    private NetworkBridgeListener networkBridgeListener;
148    private boolean createdByDuplex;
149    private BrokerInfo localBrokerInfo;
150    private BrokerInfo remoteBrokerInfo;
151
152    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
153    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
154
155    private final AtomicBoolean started = new AtomicBoolean();
156    private TransportConnection duplexInitiatingConnection;
157    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
158    protected BrokerService brokerService = null;
159    private ObjectName mbeanObjectName;
160    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
161    private Transport duplexInboundLocalBroker = null;
162    private ProducerInfo duplexInboundLocalProducerInfo;
163
164    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
165        this.configuration = configuration;
166        this.localBroker = localBroker;
167        this.remoteBroker = remoteBroker;
168    }
169
170    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
171        this.localBrokerInfo = localBrokerInfo;
172        this.remoteBrokerInfo = remoteBrokerInfo;
173        this.duplexInitiatingConnection = connection;
174        start();
175        serviceRemoteCommand(remoteBrokerInfo);
176    }
177
178    @Override
179    public void start() throws Exception {
180        if (started.compareAndSet(false, true)) {
181
182            if (brokerService == null) {
183                throw new IllegalArgumentException("BrokerService is null on " + this);
184            }
185
186            if (isDuplex()) {
187                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
188                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
189
190                    @Override
191                    public void onCommand(Object o) {
192                        Command command = (Command) o;
193                        serviceLocalCommand(command);
194                    }
195
196                    @Override
197                    public void onException(IOException error) {
198                        serviceLocalException(error);
199                    }
200                });
201                duplexInboundLocalBroker.start();
202            }
203
204            localBroker.setTransportListener(new DefaultTransportListener() {
205
206                @Override
207                public void onCommand(Object o) {
208                    Command command = (Command) o;
209                    serviceLocalCommand(command);
210                }
211
212                @Override
213                public void onException(IOException error) {
214                    if (!futureLocalBrokerInfo.isDone()) {
215                        LOG.info("error with pending local brokerInfo on: " + localBroker, error);
216                        futureLocalBrokerInfo.cancel(true);
217                        return;
218                    }
219                    serviceLocalException(error);
220                }
221            });
222
223            remoteBroker.setTransportListener(new DefaultTransportListener() {
224
225                @Override
226                public void onCommand(Object o) {
227                    Command command = (Command) o;
228                    serviceRemoteCommand(command);
229                }
230
231                @Override
232                public void onException(IOException error) {
233                    if (!futureRemoteBrokerInfo.isDone()) {
234                        LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error);
235                        futureRemoteBrokerInfo.cancel(true);
236                        return;
237                    }
238                    serviceRemoteException(error);
239                }
240            });
241
242            remoteBroker.start();
243            localBroker.start();
244
245            if (!disposed.get()) {
246                try {
247                    triggerStartAsyncNetworkBridgeCreation();
248                } catch (IOException e) {
249                    LOG.warn("Caught exception from remote start", e);
250                }
251            } else {
252                LOG.warn("Bridge was disposed before the start() method was fully executed.");
253                throw new TransportDisposedIOException();
254            }
255        }
256    }
257
258    @Override
259    public void stop() throws Exception {
260        if (started.compareAndSet(true, false)) {
261            if (disposed.compareAndSet(false, true)) {
262                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
263
264                futureRemoteBrokerInfo.cancel(true);
265                futureLocalBrokerInfo.cancel(true);
266
267                NetworkBridgeListener l = this.networkBridgeListener;
268                if (l != null) {
269                    l.onStop(this);
270                }
271                try {
272                    // local start complete
273                    if (startedLatch.getCount() < 2) {
274                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
275                                configuration.getBrokerName(), this, remoteBrokerName
276                        });
277                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
278                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
279                    }
280
281                    remoteBridgeStarted.set(false);
282                    final CountDownLatch sendShutdown = new CountDownLatch(1);
283
284                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
285                        @Override
286                        public void run() {
287                            try {
288                                serialExecutor.shutdown();
289                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
290                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
291                                    LOG.info("pending tasks on stop {}", pendingTasks);
292                                }
293                                localBroker.oneway(new ShutdownInfo());
294                                remoteBroker.oneway(new ShutdownInfo());
295                            } catch (Throwable e) {
296                                LOG.debug("Caught exception sending shutdown", e);
297                            } finally {
298                                sendShutdown.countDown();
299                            }
300
301                        }
302                    }, "ActiveMQ ForwardingBridge StopTask");
303
304                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
305                        LOG.info("Network Could not shutdown in a timely manner");
306                    }
307                } finally {
308                    ServiceStopper ss = new ServiceStopper();
309                    stopFailoverTransport(remoteBroker);
310                    ss.stop(remoteBroker);
311                    ss.stop(localBroker);
312                    ss.stop(duplexInboundLocalBroker);
313                    // Release the started Latch since another thread could be
314                    // stuck waiting for it to start up.
315                    startedLatch.countDown();
316                    startedLatch.countDown();
317                    localStartedLatch.countDown();
318
319                    ss.throwFirstException();
320                }
321            }
322
323            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
324        }
325    }
326
327    private void stopFailoverTransport(Transport transport) {
328        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
329        if (failoverTransport != null) {
330            // may be blocked on write, in which case stop will block
331            try {
332                failoverTransport.handleTransportFailure(new IOException("Bridge stopped"));
333            } catch (InterruptedException ignored) {}
334        }
335    }
336
337    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
338        brokerService.getTaskRunnerFactory().execute(new Runnable() {
339            @Override
340            public void run() {
341                final String originalName = Thread.currentThread().getName();
342                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
343                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
344
345                try {
346                    // First we collect the info data from both the local and remote ends
347                    collectBrokerInfos();
348
349                    // Once we have all required broker info we can attempt to start
350                    // the local and then remote sides of the bridge.
351                    doStartLocalAndRemoteBridges();
352                } finally {
353                    Thread.currentThread().setName(originalName);
354                }
355            }
356        });
357    }
358
359    private void collectBrokerInfos() {
360
361        // First wait for the remote to feed us its BrokerInfo, then we can check on
362        // the LocalBrokerInfo and decide is this is a loop.
363        try {
364            remoteBrokerInfo = futureRemoteBrokerInfo.get();
365            if (remoteBrokerInfo == null) {
366                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
367                return;
368            }
369        } catch (Exception e) {
370            serviceRemoteException(e);
371            return;
372        }
373
374        try {
375            localBrokerInfo = futureLocalBrokerInfo.get();
376            if (localBrokerInfo == null) {
377                serviceLocalException(new Throwable("localBrokerInfo is null"));
378                return;
379            }
380
381            // Before we try and build the bridge lets check if we are in a loop
382            // and if so just stop now before registering anything.
383            remoteBrokerId = remoteBrokerInfo.getBrokerId();
384            if (localBrokerId.equals(remoteBrokerId)) {
385                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
386                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
387                });
388                ServiceSupport.dispose(localBroker);
389                ServiceSupport.dispose(remoteBroker);
390                // the bridge is left in a bit of limbo, but it won't get retried
391                // in this state.
392                return;
393            }
394
395            // Fill in the remote broker's information now.
396            remoteBrokerPath[0] = remoteBrokerId;
397            remoteBrokerName = remoteBrokerInfo.getBrokerName();
398            if (configuration.isUseBrokerNamesAsIdSeed()) {
399                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
400            }
401        } catch (Throwable e) {
402            serviceLocalException(e);
403        }
404    }
405
406    private void doStartLocalAndRemoteBridges() {
407
408        if (disposed.get()) {
409            return;
410        }
411
412        if (isCreatedByDuplex()) {
413            // apply remote (propagated) configuration to local duplex bridge before start
414            Properties props = null;
415            try {
416                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
417                IntrospectionSupport.getProperties(configuration, props, null);
418                if (configuration.getExcludedDestinations() != null) {
419                    excludedDestinations = configuration.getExcludedDestinations().toArray(
420                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
421                }
422                if (configuration.getStaticallyIncludedDestinations() != null) {
423                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
424                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
425                }
426                if (configuration.getDynamicallyIncludedDestinations() != null) {
427                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
428                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
429                }
430            } catch (Throwable t) {
431                LOG.error("Error mapping remote configuration: {}", props, t);
432            }
433        }
434
435        try {
436            startLocalBridge();
437        } catch (Throwable e) {
438            serviceLocalException(e);
439            return;
440        }
441
442        try {
443            startRemoteBridge();
444        } catch (Throwable e) {
445            serviceRemoteException(e);
446            return;
447        }
448
449        try {
450            if (safeWaitUntilStarted()) {
451                setupStaticDestinations();
452            }
453        } catch (Throwable e) {
454            serviceLocalException(e);
455        }
456    }
457
458    private void startLocalBridge() throws Throwable {
459        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
460            synchronized (this) {
461                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
462                if (!disposed.get()) {
463
464                    if (idGenerator == null) {
465                        throw new IllegalStateException("Id Generator cannot be null");
466                    }
467
468                    localConnectionInfo = new ConnectionInfo();
469                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
470                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
471                    localConnectionInfo.setClientId(localClientId);
472                    localConnectionInfo.setUserName(configuration.getUserName());
473                    localConnectionInfo.setPassword(configuration.getPassword());
474                    Transport originalTransport = remoteBroker;
475                    while (originalTransport instanceof TransportFilter) {
476                        originalTransport = ((TransportFilter) originalTransport).getNext();
477                    }
478                    setTransportContext(originalTransport, localConnectionInfo);
479                    // sync requests that may fail
480                    Object resp = localBroker.request(localConnectionInfo);
481                    if (resp instanceof ExceptionResponse) {
482                        throw ((ExceptionResponse) resp).getException();
483                    }
484                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
485                    localBroker.oneway(localSessionInfo);
486
487                    if (configuration.isDuplex()) {
488                        // separate in-bound channel for forwards so we don't
489                        // contend with out-bound dispatch on same connection
490                        remoteBrokerInfo.setNetworkConnection(true);
491                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
492
493                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
494                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
495                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
496                                + configuration.getBrokerName());
497                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
498                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
499
500                        setTransportContext(originalTransport, duplexLocalConnectionInfo);
501
502                        // sync requests that may fail
503                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
504                        if (resp instanceof ExceptionResponse) {
505                            throw ((ExceptionResponse) resp).getException();
506                        }
507                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
508                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
509                        duplexInboundLocalBroker.oneway(duplexInboundSession);
510                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
511                    }
512                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
513                    NetworkBridgeListener l = this.networkBridgeListener;
514                    if (l != null) {
515                        l.onStart(this);
516                    }
517
518                    // Let the local broker know the remote broker's ID.
519                    localBroker.oneway(remoteBrokerInfo);
520                    // new peer broker (a consumer can work with remote broker also)
521                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
522
523                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
524                            localBroker, remoteBroker, remoteBrokerName
525                    });
526                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
527                            configuration.getBrokerName(), this, remoteBrokerName
528                    });
529                } else {
530                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
531                }
532                startedLatch.countDown();
533                localStartedLatch.countDown();
534            }
535        }
536    }
537
538    private void setTransportContext(Transport transport, ConnectionInfo connectionInfo) {
539        if (transport instanceof SslTransport) {
540            connectionInfo.setTransportContext(((SslTransport)transport).getPeerCertificates());
541        } else if (transport instanceof NIOSSLTransport) {
542            connectionInfo.setTransportContext(((NIOSSLTransport)transport).getPeerCertificates());
543        }
544    }
545
546    protected void startRemoteBridge() throws Exception {
547        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
548            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
549            synchronized (this) {
550                if (!isCreatedByDuplex()) {
551                    BrokerInfo brokerInfo = new BrokerInfo();
552                    brokerInfo.setBrokerName(configuration.getBrokerName());
553                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
554                    brokerInfo.setNetworkConnection(true);
555                    brokerInfo.setDuplexConnection(configuration.isDuplex());
556                    // set our properties
557                    Properties props = new Properties();
558                    IntrospectionSupport.getProperties(configuration, props, null);
559                    props.remove("networkTTL");
560                    String str = MarshallingSupport.propertiesToString(props);
561                    brokerInfo.setNetworkProperties(str);
562                    brokerInfo.setBrokerId(this.localBrokerId);
563                    remoteBroker.oneway(brokerInfo);
564                }
565                if (remoteConnectionInfo != null) {
566                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
567                }
568                remoteConnectionInfo = new ConnectionInfo();
569                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
570                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
571                remoteConnectionInfo.setUserName(configuration.getUserName());
572                remoteConnectionInfo.setPassword(configuration.getPassword());
573                remoteBroker.oneway(remoteConnectionInfo);
574
575                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
576                remoteBroker.oneway(remoteSessionInfo);
577                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
578                producerInfo.setResponseRequired(false);
579                remoteBroker.oneway(producerInfo);
580                // Listen to consumer advisory messages on the remote broker to determine demand.
581                if (!configuration.isStaticBridge()) {
582                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
583                    // always dispatch advisory message asynchronously so that
584                    // we never block the producer broker if we are slow
585                    demandConsumerInfo.setDispatchAsync(true);
586                    String advisoryTopic = configuration.getDestinationFilter();
587                    if (configuration.isBridgeTempDestinations()) {
588                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
589                    }
590                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
591                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
592                    remoteBroker.oneway(demandConsumerInfo);
593                }
594                startedLatch.countDown();
595            }
596        }
597    }
598
599    @Override
600    public void serviceRemoteException(Throwable error) {
601        if (!disposed.get()) {
602            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
603                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
604                        localBroker, remoteBroker, error
605                });
606            } else {
607                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
608                        localBroker, remoteBroker, error
609                });
610            }
611            LOG.debug("The remote Exception was: {}", error, error);
612            brokerService.getTaskRunnerFactory().execute(new Runnable() {
613                @Override
614                public void run() {
615                    ServiceSupport.dispose(getControllingService());
616                }
617            });
618            fireBridgeFailed(error);
619        }
620    }
621
622    protected void serviceRemoteCommand(Command command) {
623        if (!disposed.get()) {
624            try {
625                if (command.isMessageDispatch()) {
626                    safeWaitUntilStarted();
627                    MessageDispatch md = (MessageDispatch) command;
628                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
629                    ackAdvisory(md.getMessage());
630                } else if (command.isBrokerInfo()) {
631                    futureRemoteBrokerInfo.set((BrokerInfo) command);
632                } else if (command.getClass() == ConnectionError.class) {
633                    ConnectionError ce = (ConnectionError) command;
634                    serviceRemoteException(ce.getException());
635                } else {
636                    if (isDuplex()) {
637                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
638                        if (command.isMessage()) {
639                            final ActiveMQMessage message = (ActiveMQMessage) command;
640                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
641                                serviceRemoteConsumerAdvisory(message.getDataStructure());
642                                ackAdvisory(message);
643                            } else {
644                                if (!isPermissableDestination(message.getDestination(), true)) {
645                                    return;
646                                }
647                                safeWaitUntilStarted();
648                                // message being forwarded - we need to
649                                // propagate the response to our local send
650                                if (canDuplexDispatch(message)) {
651                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
652                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
653                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
654                                            final int correlationId = message.getCommandId();
655
656                                            @Override
657                                            public void onCompletion(FutureResponse resp) {
658                                                try {
659                                                    Response reply = resp.getResult();
660                                                    reply.setCorrelationId(correlationId);
661                                                    remoteBroker.oneway(reply);
662                                                } catch (IOException error) {
663                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
664                                                    serviceRemoteException(error);
665                                                }
666                                            }
667                                        });
668                                    } else {
669                                        duplexInboundLocalBroker.oneway(message);
670                                    }
671                                    serviceInboundMessage(message);
672                                } else {
673                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
674                                        Response reply = new Response();
675                                        reply.setCorrelationId(message.getCommandId());
676                                        remoteBroker.oneway(reply);
677                                    }
678                                }
679                            }
680                        } else {
681                            switch (command.getDataStructureType()) {
682                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
683                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
684                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
685                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
686                                    } else {
687                                        localBroker.oneway(command);
688                                    }
689                                    break;
690                                case SessionInfo.DATA_STRUCTURE_TYPE:
691                                    localBroker.oneway(command);
692                                    break;
693                                case ProducerInfo.DATA_STRUCTURE_TYPE:
694                                    // using duplexInboundLocalProducerInfo
695                                    break;
696                                case MessageAck.DATA_STRUCTURE_TYPE:
697                                    MessageAck ack = (MessageAck) command;
698                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
699                                    if (localSub != null) {
700                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
701                                        localBroker.oneway(ack);
702                                    } else {
703                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
704                                    }
705                                    break;
706                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
707                                    localStartedLatch.await();
708                                    if (started.get()) {
709                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
710                                        if (isDuplicateSuppressionOff(consumerInfo)) {
711                                            addConsumerInfo(consumerInfo);
712                                        } else {
713                                            synchronized (brokerService.getVmConnectorURI()) {
714                                                addConsumerInfo(consumerInfo);
715                                            }
716                                        }
717                                    } else {
718                                        // received a subscription whilst stopping
719                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
720                                    }
721                                    break;
722                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
723                                    // initiator is shutting down, controlled case
724                                    // abortive close dealt with by inactivity monitor
725                                    LOG.info("Stopping network bridge on shutdown of remote broker");
726                                    serviceRemoteException(new IOException(command.toString()));
727                                    break;
728                                default:
729                                    LOG.debug("Ignoring remote command: {}", command);
730                            }
731                        }
732                    } else {
733                        switch (command.getDataStructureType()) {
734                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
735                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
736                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
737                                break;
738                            default:
739                                LOG.warn("Unexpected remote command: {}", command);
740                        }
741                    }
742                }
743            } catch (Throwable e) {
744                LOG.debug("Exception processing remote command: {}", command, e);
745                serviceRemoteException(e);
746            }
747        }
748    }
749
750    private void ackAdvisory(Message message) throws IOException {
751        demandConsumerDispatched++;
752        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
753            final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
754            ack.setConsumerId(demandConsumerInfo.getConsumerId());
755            brokerService.getTaskRunnerFactory().execute(new Runnable() {
756                @Override
757                public void run() {
758                    try {
759                        remoteBroker.oneway(ack);
760                    } catch (IOException e) {
761                        LOG.warn("Failed to send advisory ack " + ack, e);
762                    }
763                }
764            });
765            demandConsumerDispatched = 0;
766        }
767    }
768
769    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
770        final int networkTTL = configuration.getConsumerTTL();
771        if (data.getClass() == ConsumerInfo.class) {
772            // Create a new local subscription
773            ConsumerInfo info = (ConsumerInfo) data;
774            BrokerId[] path = info.getBrokerPath();
775
776            if (info.isBrowser()) {
777                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
778                return;
779            }
780
781            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
782                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
783                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
784                });
785                return;
786            }
787
788            if (contains(path, localBrokerPath[0])) {
789                // Ignore this consumer as it's a consumer we locally sent to the broker.
790                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
791                        configuration.getBrokerName(), remoteBrokerName, info
792                });
793                return;
794            }
795
796            if (!isPermissableDestination(info.getDestination())) {
797                // ignore if not in the permitted or in the excluded list
798                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
799                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
800                });
801                return;
802            }
803
804            // in a cyclic network there can be multiple bridges per broker that can propagate
805            // a network subscription so there is a need to synchronize on a shared entity
806            // if duplicate suppression is required
807            if (isDuplicateSuppressionOff(info)) {
808                addConsumerInfo(info);
809            } else {
810                synchronized (brokerService.getVmConnectorURI()) {
811                    addConsumerInfo(info);
812                }
813            }
814        } else if (data.getClass() == DestinationInfo.class) {
815            // It's a destination info - we want to pass up information about temporary destinations
816            final DestinationInfo destInfo = (DestinationInfo) data;
817            BrokerId[] path = destInfo.getBrokerPath();
818            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
819                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
820                        configuration.getBrokerName(), destInfo, networkTTL
821                });
822                return;
823            }
824            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
825                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
826                return;
827            }
828            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
829            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
830                // re-set connection id so comes from here
831                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
832                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
833            }
834            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
835            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
836                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
837            });
838            if (destInfo.isRemoveOperation()) {
839                // Serialize with removeSub operations such that all removeSub advisories
840                // are generated
841                serialExecutor.execute(new Runnable() {
842                    @Override
843                    public void run() {
844                        try {
845                            localBroker.oneway(destInfo);
846                        } catch (IOException e) {
847                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
848                        }
849                    }
850                });
851            } else {
852                localBroker.oneway(destInfo);
853            }
854        } else if (data.getClass() == RemoveInfo.class) {
855            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
856            removeDemandSubscription(id);
857        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
858            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
859            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
860            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
861                DemandSubscription ds = i.next();
862                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
863                if (removed) {
864                    if (ds.getDurableRemoteSubs().isEmpty()) {
865
866                        // deactivate subscriber
867                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
868                        localBroker.oneway(removeInfo);
869
870                        // remove subscriber
871                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
872                        sending.setClientId(localClientId);
873                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
874                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
875                        localBroker.oneway(sending);
876                    }
877                }
878            }
879        }
880    }
881
882    @Override
883    public void serviceLocalException(Throwable error) {
884        serviceLocalException(null, error);
885    }
886
887    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
888        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
889        if (!disposed.get()) {
890            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
891                // not a reason to terminate the bridge - temps can disappear with
892                // pending sends as the demand sub may outlive the remote dest
893                if (messageDispatch != null) {
894                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
895                    try {
896                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
897                        poisonAck.setPoisonCause(error);
898                        localBroker.oneway(poisonAck);
899                    } catch (IOException ioe) {
900                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
901                    }
902                    fireFailedForwardAdvisory(messageDispatch, error);
903                } else {
904                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
905                }
906                return;
907            }
908
909            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
910            LOG.debug("The local Exception was: {}", error, error);
911
912            brokerService.getTaskRunnerFactory().execute(new Runnable() {
913                @Override
914                public void run() {
915                    ServiceSupport.dispose(getControllingService());
916                }
917            });
918            fireBridgeFailed(error);
919        }
920    }
921
922    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
923        if (configuration.isAdvisoryForFailedForward()) {
924            AdvisoryBroker advisoryBroker = null;
925            try {
926                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
927
928                if (advisoryBroker != null) {
929                    ConnectionContext context = new ConnectionContext();
930                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
931                    context.setBroker(brokerService.getBroker());
932
933                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
934                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
935                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
936                            advisoryMessage);
937
938                }
939            } catch (Exception e) {
940                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
941                LOG.debug("detail", e);
942            }
943        }
944    }
945
946    protected Service getControllingService() {
947        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
948    }
949
950    protected void addSubscription(DemandSubscription sub) throws IOException {
951        if (sub != null) {
952            localBroker.oneway(sub.getLocalInfo());
953        }
954    }
955
956    protected void removeSubscription(final DemandSubscription sub) throws IOException {
957        if (sub != null) {
958            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
959
960            // ensure not available for conduit subs pending removal
961            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
962            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
963
964            // continue removal in separate thread to free up this thread for outstanding responses
965            // Serialize with removeDestination operations so that removeSubs are serialized with
966            // removeDestinations such that all removeSub advisories are generated
967            serialExecutor.execute(new Runnable() {
968                @Override
969                public void run() {
970                    sub.waitForCompletion();
971                    try {
972                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
973                    } catch (IOException e) {
974                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
975                    }
976                }
977            });
978        }
979    }
980
981    protected Message configureMessage(MessageDispatch md) throws IOException {
982        Message message = md.getMessage().copy();
983        // Update the packet to show where it came from.
984        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
985        message.setProducerId(producerInfo.getProducerId());
986        message.setDestination(md.getDestination());
987        message.setMemoryUsage(null);
988        if (message.getOriginalTransactionId() == null) {
989            message.setOriginalTransactionId(message.getTransactionId());
990        }
991        message.setTransactionId(null);
992        if (configuration.isUseCompression()) {
993            message.compress();
994        }
995        return message;
996    }
997
998    protected void serviceLocalCommand(Command command) {
999        if (!disposed.get()) {
1000            try {
1001                if (command.isMessageDispatch()) {
1002                    safeWaitUntilStarted();
1003                    enqueueCounter.incrementAndGet();
1004                    final MessageDispatch md = (MessageDispatch) command;
1005                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1006                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1007
1008                        if (suppressMessageDispatch(md, sub)) {
1009                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1010                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1011                            });
1012                            // still ack as it may be durable
1013                            try {
1014                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1015                            } finally {
1016                                sub.decrementOutstandingResponses();
1017                            }
1018                            return;
1019                        }
1020
1021                        Message message = configureMessage(md);
1022                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1023                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1024                        });
1025                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1026                            try {
1027                                // never request b/c they are eventually acked async
1028                                remoteBroker.oneway(message);
1029                            } finally {
1030                                sub.decrementOutstandingResponses();
1031                            }
1032                            return;
1033                        }
1034
1035                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1036
1037                            // The message was not sent using async send, so we should only
1038                            // ack the local broker when we get confirmation that the remote
1039                            // broker has received the message.
1040                            remoteBroker.asyncRequest(message, new ResponseCallback() {
1041                                @Override
1042                                public void onCompletion(FutureResponse future) {
1043                                    try {
1044                                        Response response = future.getResult();
1045                                        if (response.isException()) {
1046                                            ExceptionResponse er = (ExceptionResponse) response;
1047                                            serviceLocalException(md, er.getException());
1048                                        } else {
1049                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1050                                            dequeueCounter.incrementAndGet();
1051                                        }
1052                                    } catch (IOException e) {
1053                                        serviceLocalException(md, e);
1054                                    } finally {
1055                                        sub.decrementOutstandingResponses();
1056                                    }
1057                                }
1058                            });
1059
1060                        } else {
1061                            // If the message was originally sent using async send, we will
1062                            // preserve that QOS by bridging it using an async send (small chance
1063                            // of message loss).
1064                            try {
1065                                remoteBroker.oneway(message);
1066                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1067                                dequeueCounter.incrementAndGet();
1068                            } finally {
1069                                sub.decrementOutstandingResponses();
1070                            }
1071                        }
1072                        serviceOutbound(message);
1073                    } else {
1074                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1075                    }
1076                } else if (command.isBrokerInfo()) {
1077                    futureLocalBrokerInfo.set((BrokerInfo) command);
1078                } else if (command.isShutdownInfo()) {
1079                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1080                    stop();
1081                } else if (command.getClass() == ConnectionError.class) {
1082                    ConnectionError ce = (ConnectionError) command;
1083                    serviceLocalException(ce.getException());
1084                } else {
1085                    switch (command.getDataStructureType()) {
1086                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1087                            break;
1088                        default:
1089                            LOG.warn("Unexpected local command: {}", command);
1090                    }
1091                }
1092            } catch (Throwable e) {
1093                LOG.warn("Caught an exception processing local command", e);
1094                serviceLocalException(e);
1095            }
1096        }
1097    }
1098
1099    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1100        boolean suppress = false;
1101        // for durable subs, suppression via filter leaves dangling acks so we
1102        // need to check here and allow the ack irrespective
1103        if (sub.getLocalInfo().isDurable()) {
1104            NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext();
1105            messageEvalContext.setMessageReference(md.getMessage());
1106            messageEvalContext.setDestination(md.getDestination());
1107            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1108        }
1109        return suppress;
1110    }
1111
1112    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1113        if (brokerPath != null) {
1114            for (BrokerId id : brokerPath) {
1115                if (brokerId.equals(id)) {
1116                    return true;
1117                }
1118            }
1119        }
1120        return false;
1121    }
1122
1123    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1124        if (brokerPath == null || brokerPath.length == 0) {
1125            return pathsToAppend;
1126        }
1127        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1128        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1129        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1130        return rc;
1131    }
1132
1133    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1134        if (brokerPath == null || brokerPath.length == 0) {
1135            return new BrokerId[]{idToAppend};
1136        }
1137        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1138        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1139        rc[brokerPath.length] = idToAppend;
1140        return rc;
1141    }
1142
1143    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1144        return isPermissableDestination(destination, false);
1145    }
1146
1147    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1148        // Are we not bridging temporary destinations?
1149        if (destination.isTemporary()) {
1150            if (allowTemporary) {
1151                return true;
1152            } else {
1153                return configuration.isBridgeTempDestinations();
1154            }
1155        }
1156
1157        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1158        if (dests != null && dests.length > 0) {
1159            for (ActiveMQDestination dest : dests) {
1160                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1161                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1162                    return true;
1163                }
1164            }
1165        }
1166
1167        dests = excludedDestinations;
1168        if (dests != null && dests.length > 0) {
1169            for (ActiveMQDestination dest : dests) {
1170                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1171                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1172                    return false;
1173                }
1174            }
1175        }
1176
1177        dests = dynamicallyIncludedDestinations;
1178        if (dests != null && dests.length > 0) {
1179            for (ActiveMQDestination dest : dests) {
1180                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1181                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1182                    return true;
1183                }
1184            }
1185
1186            return false;
1187        }
1188        return true;
1189    }
1190
1191    /**
1192     * Subscriptions for these destinations are always created
1193     */
1194    protected void setupStaticDestinations() {
1195        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1196        if (dests != null) {
1197            for (ActiveMQDestination dest : dests) {
1198                DemandSubscription sub = createDemandSubscription(dest);
1199                sub.setStaticallyIncluded(true);
1200                try {
1201                    addSubscription(sub);
1202                } catch (IOException e) {
1203                    LOG.error("Failed to add static destination {}", dest, e);
1204                }
1205                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1206            }
1207        }
1208    }
1209
1210    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1211        ConsumerInfo info = consumerInfo.copy();
1212        addRemoteBrokerToBrokerPath(info);
1213        DemandSubscription sub = createDemandSubscription(info);
1214        if (sub != null) {
1215            if (duplicateSuppressionIsRequired(sub)) {
1216                undoMapRegistration(sub);
1217            } else {
1218                if (consumerInfo.isDurable()) {
1219                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1220                }
1221                addSubscription(sub);
1222                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1223            }
1224        }
1225    }
1226
1227    private void undoMapRegistration(DemandSubscription sub) {
1228        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1229        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1230    }
1231
1232    /*
1233     * check our existing subs networkConsumerIds against the list of network
1234     * ids in this subscription A match means a duplicate which we suppress for
1235     * topics and maybe for queues
1236     */
1237    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1238        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1239        boolean suppress = false;
1240
1241        if (isDuplicateSuppressionOff(consumerInfo)) {
1242            return suppress;
1243        }
1244
1245        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1246        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1247        for (Subscription sub : currentSubs) {
1248            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1249            if (!networkConsumers.isEmpty()) {
1250                if (matchFound(candidateConsumers, networkConsumers)) {
1251                    if (isInActiveDurableSub(sub)) {
1252                        suppress = false;
1253                    } else {
1254                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1255                    }
1256                    break;
1257                }
1258            }
1259        }
1260        return suppress;
1261    }
1262
1263    private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
1264        return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
1265                || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
1266                || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
1267    }
1268
1269    private boolean isInActiveDurableSub(Subscription sub) {
1270        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1271    }
1272
1273    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1274        boolean suppress = false;
1275
1276        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1277            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1278                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1279            });
1280            suppress = true;
1281        } else {
1282            // remove the existing lower priority duplicate and allow this candidate
1283            try {
1284                removeDuplicateSubscription(existingSub);
1285
1286                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1287                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1288                });
1289            } catch (IOException e) {
1290                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1291            }
1292        }
1293        return suppress;
1294    }
1295
1296    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1297        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1298            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1299                break;
1300            }
1301        }
1302    }
1303
1304    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1305        boolean found = false;
1306        for (ConsumerId aliasConsumer : networkConsumers) {
1307            if (candidateConsumers.contains(aliasConsumer)) {
1308                found = true;
1309                break;
1310            }
1311        }
1312        return found;
1313    }
1314
1315    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1316        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1317        Region region;
1318        Collection<Subscription> subs;
1319
1320        region = null;
1321        switch (dest.getDestinationType()) {
1322            case ActiveMQDestination.QUEUE_TYPE:
1323                region = region_broker.getQueueRegion();
1324                break;
1325            case ActiveMQDestination.TOPIC_TYPE:
1326                region = region_broker.getTopicRegion();
1327                break;
1328            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1329                region = region_broker.getTempQueueRegion();
1330                break;
1331            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1332                region = region_broker.getTempTopicRegion();
1333                break;
1334        }
1335
1336        if (region instanceof AbstractRegion) {
1337            subs = ((AbstractRegion) region).getSubscriptions().values();
1338        } else {
1339            subs = null;
1340        }
1341
1342        return subs;
1343    }
1344
1345    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1346        // add our original id to ourselves
1347        info.addNetworkConsumerId(info.getConsumerId());
1348        return doCreateDemandSubscription(info);
1349    }
1350
1351    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1352        DemandSubscription result = new DemandSubscription(info);
1353        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1354        if (info.getDestination().isTemporary()) {
1355            // reset the local connection Id
1356            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1357            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1358        }
1359
1360        if (configuration.isDecreaseNetworkConsumerPriority()) {
1361            byte priority = (byte) configuration.getConsumerPriorityBase();
1362            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1363                // The longer the path to the consumer, the less it's consumer priority.
1364                priority -= info.getBrokerPath().length + 1;
1365            }
1366            result.getLocalInfo().setPriority(priority);
1367            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1368        }
1369        configureDemandSubscription(info, result);
1370        return result;
1371    }
1372
1373    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1374        ConsumerInfo info = new ConsumerInfo();
1375        info.setNetworkSubscription(true);
1376        info.setDestination(destination);
1377
1378        // Indicate that this subscription is being made on behalf of the remote broker.
1379        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1380
1381        // the remote info held by the DemandSubscription holds the original
1382        // consumerId, the local info get's overwritten
1383        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1384        DemandSubscription result = null;
1385        try {
1386            result = createDemandSubscription(info);
1387        } catch (IOException e) {
1388            LOG.error("Failed to create DemandSubscription ", e);
1389        }
1390        return result;
1391    }
1392
1393    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1394        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1395                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1396            sub.getLocalInfo().setDispatchAsync(true);
1397        } else {
1398            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1399        }
1400        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1401        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1402        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1403
1404        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1405        if (!info.isDurable()) {
1406            // This works for now since we use a VM connection to the local broker.
1407            // may need to change if we ever subscribe to a remote broker.
1408            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1409        } else {
1410            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1411        }
1412    }
1413
1414    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1415        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1416        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1417                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1418        });
1419        if (sub != null) {
1420            removeSubscription(sub);
1421            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1422                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1423            });
1424        }
1425    }
1426
1427    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1428        boolean removeDone = false;
1429        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1430        if (sub != null) {
1431            try {
1432                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1433                removeDone = true;
1434            } catch (IOException e) {
1435                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1436            }
1437        }
1438        return removeDone;
1439    }
1440
1441    /**
1442     * Performs a timed wait on the started latch and then checks for disposed
1443     * before performing another wait each time the the started wait times out.
1444     */
1445    protected boolean safeWaitUntilStarted() throws InterruptedException {
1446        while (!disposed.get()) {
1447            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1448                break;
1449            }
1450        }
1451        return !disposed.get();
1452    }
1453
1454    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1455        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1456        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1457            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1458            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1459                filterFactory = entry.getNetworkBridgeFilterFactory();
1460            }
1461        }
1462        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1463    }
1464
1465    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1466        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1467    }
1468
1469    protected BrokerId[] getRemoteBrokerPath() {
1470        return remoteBrokerPath;
1471    }
1472
1473    @Override
1474    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1475        this.networkBridgeListener = listener;
1476    }
1477
1478    private void fireBridgeFailed(Throwable reason) {
1479        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1480        NetworkBridgeListener l = this.networkBridgeListener;
1481        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1482            l.bridgeFailed();
1483        }
1484    }
1485
1486    /**
1487     * @return Returns the dynamicallyIncludedDestinations.
1488     */
1489    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1490        return dynamicallyIncludedDestinations;
1491    }
1492
1493    /**
1494     * @param dynamicallyIncludedDestinations
1495     *         The dynamicallyIncludedDestinations to set.
1496     */
1497    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1498        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1499    }
1500
1501    /**
1502     * @return Returns the excludedDestinations.
1503     */
1504    public ActiveMQDestination[] getExcludedDestinations() {
1505        return excludedDestinations;
1506    }
1507
1508    /**
1509     * @param excludedDestinations The excludedDestinations to set.
1510     */
1511    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1512        this.excludedDestinations = excludedDestinations;
1513    }
1514
1515    /**
1516     * @return Returns the staticallyIncludedDestinations.
1517     */
1518    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1519        return staticallyIncludedDestinations;
1520    }
1521
1522    /**
1523     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1524     */
1525    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1526        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1527    }
1528
1529    /**
1530     * @return Returns the durableDestinations.
1531     */
1532    public ActiveMQDestination[] getDurableDestinations() {
1533        return durableDestinations;
1534    }
1535
1536    /**
1537     * @param durableDestinations The durableDestinations to set.
1538     */
1539    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1540        this.durableDestinations = durableDestinations;
1541    }
1542
1543    /**
1544     * @return Returns the localBroker.
1545     */
1546    public Transport getLocalBroker() {
1547        return localBroker;
1548    }
1549
1550    /**
1551     * @return Returns the remoteBroker.
1552     */
1553    public Transport getRemoteBroker() {
1554        return remoteBroker;
1555    }
1556
1557    /**
1558     * @return the createdByDuplex
1559     */
1560    public boolean isCreatedByDuplex() {
1561        return this.createdByDuplex;
1562    }
1563
1564    /**
1565     * @param createdByDuplex the createdByDuplex to set
1566     */
1567    public void setCreatedByDuplex(boolean createdByDuplex) {
1568        this.createdByDuplex = createdByDuplex;
1569    }
1570
1571    @Override
1572    public String getRemoteAddress() {
1573        return remoteBroker.getRemoteAddress();
1574    }
1575
1576    @Override
1577    public String getLocalAddress() {
1578        return localBroker.getRemoteAddress();
1579    }
1580
1581    @Override
1582    public String getRemoteBrokerName() {
1583        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1584    }
1585
1586    @Override
1587    public String getRemoteBrokerId() {
1588        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1589    }
1590
1591    @Override
1592    public String getLocalBrokerName() {
1593        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1594    }
1595
1596    @Override
1597    public long getDequeueCounter() {
1598        return dequeueCounter.get();
1599    }
1600
1601    @Override
1602    public long getEnqueueCounter() {
1603        return enqueueCounter.get();
1604    }
1605
1606    protected boolean isDuplex() {
1607        return configuration.isDuplex() || createdByDuplex;
1608    }
1609
1610    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1611        return subscriptionMapByRemoteId;
1612    }
1613
1614    @Override
1615    public void setBrokerService(BrokerService brokerService) {
1616        this.brokerService = brokerService;
1617        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1618        localBrokerPath[0] = localBrokerId;
1619    }
1620
1621    @Override
1622    public void setMbeanObjectName(ObjectName objectName) {
1623        this.mbeanObjectName = objectName;
1624    }
1625
1626    @Override
1627    public ObjectName getMbeanObjectName() {
1628        return mbeanObjectName;
1629    }
1630
1631    @Override
1632    public void resetStats() {
1633        enqueueCounter.set(0);
1634        dequeueCounter.set(0);
1635    }
1636
1637    /*
1638     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1639     * remote sides of the network bridge.
1640     */
1641    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1642
1643        private final CountDownLatch slot = new CountDownLatch(1);
1644        private final AtomicBoolean disposed;
1645        private volatile BrokerInfo info = null;
1646
1647        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1648            this.info = info;
1649            this.disposed = disposed;
1650        }
1651
1652        @Override
1653        public boolean cancel(boolean mayInterruptIfRunning) {
1654            slot.countDown();
1655            return true;
1656        }
1657
1658        @Override
1659        public boolean isCancelled() {
1660            return slot.getCount() == 0 && info == null;
1661        }
1662
1663        @Override
1664        public boolean isDone() {
1665            return info != null;
1666        }
1667
1668        @Override
1669        public BrokerInfo get() throws InterruptedException, ExecutionException {
1670            try {
1671                if (info == null) {
1672                    while (!disposed.get()) {
1673                        if (slot.await(1, TimeUnit.SECONDS)) {
1674                            break;
1675                        }
1676                    }
1677                }
1678                return info;
1679            } catch (InterruptedException e) {
1680                Thread.currentThread().interrupt();
1681                LOG.debug("Operation interrupted: {}", e, e);
1682                throw new InterruptedException("Interrupted.");
1683            }
1684        }
1685
1686        @Override
1687        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1688            try {
1689                if (info == null) {
1690                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1691
1692                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1693                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1694                            break;
1695                        }
1696                    }
1697                    if (info == null) {
1698                        throw new TimeoutException();
1699                    }
1700                }
1701                return info;
1702            } catch (InterruptedException e) {
1703                throw new InterruptedException("Interrupted.");
1704            }
1705        }
1706
1707        public void set(BrokerInfo info) {
1708            this.info = info;
1709            this.slot.countDown();
1710        }
1711    }
1712
1713    protected void serviceOutbound(Message message) {
1714        NetworkBridgeListener l = this.networkBridgeListener;
1715        if (l != null) {
1716            l.onOutboundMessage(this, message);
1717        }
1718    }
1719
1720    protected void serviceInboundMessage(Message message) {
1721        NetworkBridgeListener l = this.networkBridgeListener;
1722        if (l != null) {
1723            l.onInboundMessage(this, message);
1724        }
1725    }
1726
1727    protected boolean canDuplexDispatch(Message message) {
1728        boolean result = true;
1729        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1730            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1731            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1732            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1733            if (producerSequenceId <= lastStoredForMessageProducer) {
1734                result = false;
1735                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1736                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1737                });
1738            }
1739        }
1740        return result;
1741    }
1742
1743    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1744        try {
1745            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1746        } catch (IOException ignored) {
1747            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1748        }
1749        return -1;
1750    }
1751
1752}