001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.LinkedList;
023import java.util.StringTokenizer;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.regex.Pattern;
026
027import javax.management.ObjectName;
028
029import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030import org.apache.activemq.broker.jmx.ManagementContext;
031import org.apache.activemq.broker.region.ConnectorStatistics;
032import org.apache.activemq.command.BrokerInfo;
033import org.apache.activemq.command.ConnectionControl;
034import org.apache.activemq.security.MessageAuthorizationPolicy;
035import org.apache.activemq.thread.TaskRunnerFactory;
036import org.apache.activemq.transport.Transport;
037import org.apache.activemq.transport.TransportAcceptListener;
038import org.apache.activemq.transport.TransportFactorySupport;
039import org.apache.activemq.transport.TransportServer;
040import org.apache.activemq.transport.discovery.DiscoveryAgent;
041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042import org.apache.activemq.util.ServiceStopper;
043import org.apache.activemq.util.ServiceSupport;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * @org.apache.xbean.XBean
049 */
050public class TransportConnector implements Connector, BrokerServiceAware {
051
052    final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053
054    protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055    protected TransportStatusDetector statusDector;
056    private BrokerService brokerService;
057    private TransportServer server;
058    private URI uri;
059    private BrokerInfo brokerInfo = new BrokerInfo();
060    private TaskRunnerFactory taskRunnerFactory;
061    private MessageAuthorizationPolicy messageAuthorizationPolicy;
062    private DiscoveryAgent discoveryAgent;
063    private final ConnectorStatistics statistics = new ConnectorStatistics();
064    private URI discoveryUri;
065    private String name;
066    private boolean disableAsyncDispatch;
067    private boolean enableStatusMonitor = false;
068    private Broker broker;
069    private boolean updateClusterClients = false;
070    private boolean rebalanceClusterClients;
071    private boolean updateClusterClientsOnRemove = false;
072    private String updateClusterFilter;
073    private boolean auditNetworkProducers = false;
074    private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075    private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
076    private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
077    private boolean warnOnRemoteClose = false;
078
079    LinkedList<String> peerBrokers = new LinkedList<String>();
080
081    public TransportConnector() {
082    }
083
084    public TransportConnector(TransportServer server) {
085        this();
086        setServer(server);
087        if (server != null && server.getConnectURI() != null) {
088            URI uri = server.getConnectURI();
089            if (uri != null && uri.getScheme().equals("vm")) {
090                setEnableStatusMonitor(false);
091            }
092        }
093    }
094
095    /**
096     * @return Returns the connections.
097     */
098    public CopyOnWriteArrayList<TransportConnection> getConnections() {
099        return connections;
100    }
101
102    /**
103     * Factory method to create a JMX managed version of this transport
104     * connector
105     */
106    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
107        ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
108        rc.setBrokerInfo(getBrokerInfo());
109        rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
110        rc.setDiscoveryAgent(getDiscoveryAgent());
111        rc.setDiscoveryUri(getDiscoveryUri());
112        rc.setEnableStatusMonitor(isEnableStatusMonitor());
113        rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
114        rc.setName(getName());
115        rc.setTaskRunnerFactory(getTaskRunnerFactory());
116        rc.setUri(getUri());
117        rc.setBrokerService(brokerService);
118        rc.setUpdateClusterClients(isUpdateClusterClients());
119        rc.setRebalanceClusterClients(isRebalanceClusterClients());
120        rc.setUpdateClusterFilter(getUpdateClusterFilter());
121        rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
122        rc.setAuditNetworkProducers(isAuditNetworkProducers());
123        rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
124        rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
125        rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
126        rc.setWarnOnRemoteClose(isWarnOnRemoteClose());
127        return rc;
128    }
129
130    @Override
131    public BrokerInfo getBrokerInfo() {
132        return brokerInfo;
133    }
134
135    public void setBrokerInfo(BrokerInfo brokerInfo) {
136        this.brokerInfo = brokerInfo;
137    }
138
139    public TransportServer getServer() throws IOException, URISyntaxException {
140        if (server == null) {
141            setServer(createTransportServer());
142        }
143        return server;
144    }
145
146    public void setServer(TransportServer server) {
147        this.server = server;
148    }
149
150    public URI getUri() {
151        if (uri == null) {
152            try {
153                uri = getConnectUri();
154            } catch (Throwable e) {
155            }
156        }
157        return uri;
158    }
159
160    /**
161     * Sets the server transport URI to use if there is not a
162     * {@link TransportServer} configured via the
163     * {@link #setServer(TransportServer)} method. This value is used to lazy
164     * create a {@link TransportServer} instance
165     *
166     * @param uri
167     */
168    public void setUri(URI uri) {
169        this.uri = uri;
170    }
171
172    public TaskRunnerFactory getTaskRunnerFactory() {
173        return taskRunnerFactory;
174    }
175
176    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
177        this.taskRunnerFactory = taskRunnerFactory;
178    }
179
180    /**
181     * @return the statistics for this connector
182     */
183    @Override
184    public ConnectorStatistics getStatistics() {
185        return statistics;
186    }
187
188    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
189        return messageAuthorizationPolicy;
190    }
191
192    /**
193     * Sets the policy used to decide if the current connection is authorized to
194     * consume a given message
195     */
196    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
197        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
198    }
199
200    @Override
201    public void start() throws Exception {
202        broker = brokerService.getBroker();
203        brokerInfo.setBrokerName(broker.getBrokerName());
204        brokerInfo.setBrokerId(broker.getBrokerId());
205        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
206        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
207        brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
208        getServer().setAcceptListener(new TransportAcceptListener() {
209            @Override
210            public void onAccept(final Transport transport) {
211                final String remoteHost = transport.getRemoteAddress();
212                try {
213                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
214                        @Override
215                        public void run() {
216                            try {
217                                if (!brokerService.isStopping()) {
218                                    Connection connection = createConnection(transport);
219                                    connection.start();
220                                } else {
221                                    throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
222                                }
223                            } catch (Exception e) {
224                                ServiceSupport.dispose(transport);
225                                onAcceptError(e, remoteHost);
226                            }
227                        }
228                    });
229                } catch (Exception e) {
230                    ServiceSupport.dispose(transport);
231                    onAcceptError(e, remoteHost);
232                }
233            }
234
235            @Override
236            public void onAcceptError(Exception error) {
237                onAcceptError(error, null);
238            }
239
240            private void onAcceptError(Exception error, String remoteHost) {
241                LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
242                        + error);
243                LOG.debug("Reason: " + error, error);
244            }
245        });
246        getServer().setBrokerInfo(brokerInfo);
247        getServer().start();
248
249        DiscoveryAgent da = getDiscoveryAgent();
250        if (da != null) {
251            da.registerService(getPublishableConnectString());
252            da.start();
253        }
254        if (enableStatusMonitor) {
255            this.statusDector = new TransportStatusDetector(this);
256            this.statusDector.start();
257        }
258
259        LOG.info("Connector {} started", getName());
260    }
261
262    public String getPublishableConnectString() throws Exception {
263        String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
264        LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri());
265        return publishableConnectString;
266    }
267
268    public URI getPublishableConnectURI() throws Exception {
269        return publishedAddressPolicy.getPublishableConnectURI(this);
270    }
271
272    @Override
273    public void stop() throws Exception {
274        ServiceStopper ss = new ServiceStopper();
275        if (discoveryAgent != null) {
276            ss.stop(discoveryAgent);
277        }
278        if (server != null) {
279            ss.stop(server);
280        }
281        if (this.statusDector != null) {
282            this.statusDector.stop();
283        }
284
285        for (TransportConnection connection : connections) {
286            ss.stop(connection);
287        }
288        server = null;
289        ss.throwFirstException();
290        LOG.info("Connector {} stopped", getName());
291    }
292
293    // Implementation methods
294    // -------------------------------------------------------------------------
295    protected Connection createConnection(Transport transport) throws IOException {
296        // prefer to use task runner from broker service as stop task runner, as we can then
297        // tie it to the lifecycle of the broker service
298        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
299                : taskRunnerFactory, brokerService.getTaskRunnerFactory());
300        boolean statEnabled = this.getStatistics().isEnabled();
301        answer.getStatistics().setEnabled(statEnabled);
302        answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
303        return answer;
304    }
305
306    protected TransportServer createTransportServer() throws IOException, URISyntaxException {
307        if (uri == null) {
308            throw new IllegalArgumentException("You must specify either a server or uri property");
309        }
310        if (brokerService == null) {
311            throw new IllegalArgumentException(
312                    "You must specify the brokerService property. Maybe this connector should be added to a broker?");
313        }
314        return TransportFactorySupport.bind(brokerService, uri);
315    }
316
317    public DiscoveryAgent getDiscoveryAgent() throws IOException {
318        if (discoveryAgent == null) {
319            discoveryAgent = createDiscoveryAgent();
320        }
321        return discoveryAgent;
322    }
323
324    protected DiscoveryAgent createDiscoveryAgent() throws IOException {
325        if (discoveryUri != null) {
326            DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
327
328            if (agent != null && agent instanceof BrokerServiceAware) {
329                ((BrokerServiceAware) agent).setBrokerService(brokerService);
330            }
331
332            return agent;
333        }
334        return null;
335    }
336
337    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
338        this.discoveryAgent = discoveryAgent;
339    }
340
341    public URI getDiscoveryUri() {
342        return discoveryUri;
343    }
344
345    public void setDiscoveryUri(URI discoveryUri) {
346        this.discoveryUri = discoveryUri;
347    }
348
349    public URI getConnectUri() throws IOException, URISyntaxException {
350        if (server != null) {
351            return server.getConnectURI();
352        } else {
353            return uri;
354        }
355    }
356
357    public void onStarted(TransportConnection connection) {
358        connections.add(connection);
359    }
360
361    public void onStopped(TransportConnection connection) {
362        connections.remove(connection);
363    }
364
365    public String getName() {
366        if (name == null) {
367            uri = getUri();
368            if (uri != null) {
369                name = uri.toString();
370            }
371        }
372        return name;
373    }
374
375    public void setName(String name) {
376        this.name = name;
377    }
378
379    @Override
380    public String toString() {
381        String rc = getName();
382        if (rc == null) {
383            rc = super.toString();
384        }
385        return rc;
386    }
387
388    protected ConnectionControl getConnectionControl() {
389        boolean rebalance = isRebalanceClusterClients();
390        String connectedBrokers = "";
391        String separator = "";
392
393        if (isUpdateClusterClients()) {
394            synchronized (peerBrokers) {
395                for (String uri : getPeerBrokers()) {
396                    connectedBrokers += separator + uri;
397                    separator = ",";
398                }
399
400                if (rebalance) {
401                    String shuffle = peerBrokers.removeFirst();
402                    peerBrokers.addLast(shuffle);
403                }
404            }
405        }
406        ConnectionControl control = new ConnectionControl();
407        control.setConnectedBrokers(connectedBrokers);
408        control.setRebalanceConnection(rebalance);
409        return control;
410    }
411
412    public void addPeerBroker(BrokerInfo info) {
413        if (isMatchesClusterFilter(info.getBrokerName())) {
414            synchronized (peerBrokers) {
415                getPeerBrokers().addLast(info.getBrokerURL());
416            }
417        }
418    }
419
420    public void removePeerBroker(BrokerInfo info) {
421        synchronized (peerBrokers) {
422            getPeerBrokers().remove(info.getBrokerURL());
423        }
424    }
425
426    public LinkedList<String> getPeerBrokers() {
427        synchronized (peerBrokers) {
428            if (peerBrokers.isEmpty()) {
429                peerBrokers.add(brokerService.getDefaultSocketURIString());
430            }
431            return peerBrokers;
432        }
433    }
434
435    @Override
436    public void updateClientClusterInfo() {
437        if (isRebalanceClusterClients() || isUpdateClusterClients()) {
438            ConnectionControl control = getConnectionControl();
439            for (Connection c : this.connections) {
440                c.updateClient(control);
441                if (isRebalanceClusterClients()) {
442                    control = getConnectionControl();
443                }
444            }
445        }
446    }
447
448    private boolean isMatchesClusterFilter(String brokerName) {
449        boolean result = true;
450        String filter = getUpdateClusterFilter();
451        if (filter != null) {
452            filter = filter.trim();
453            if (filter.length() > 0) {
454                result = false;
455                StringTokenizer tokenizer = new StringTokenizer(filter, ",");
456                while (!result && tokenizer.hasMoreTokens()) {
457                    String token = tokenizer.nextToken();
458                    result = isMatchesClusterFilter(brokerName, token);
459                }
460            }
461        }
462
463        return result;
464    }
465
466    private boolean isMatchesClusterFilter(String brokerName, String match) {
467        boolean result = false;
468        if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
469            result = Pattern.matches(match, brokerName);
470        }
471        return result;
472    }
473
474    public boolean isDisableAsyncDispatch() {
475        return disableAsyncDispatch;
476    }
477
478    public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
479        this.disableAsyncDispatch = disableAsyncDispatch;
480    }
481
482    /**
483     * @return the enableStatusMonitor
484     */
485    public boolean isEnableStatusMonitor() {
486        return enableStatusMonitor;
487    }
488
489    /**
490     * @param enableStatusMonitor
491     *            the enableStatusMonitor to set
492     */
493    public void setEnableStatusMonitor(boolean enableStatusMonitor) {
494        this.enableStatusMonitor = enableStatusMonitor;
495    }
496
497    /**
498     * This is called by the BrokerService right before it starts the transport.
499     */
500    @Override
501    public void setBrokerService(BrokerService brokerService) {
502        this.brokerService = brokerService;
503    }
504
505    public Broker getBroker() {
506        return broker;
507    }
508
509    public BrokerService getBrokerService() {
510        return brokerService;
511    }
512
513    /**
514     * @return the updateClusterClients
515     */
516    @Override
517    public boolean isUpdateClusterClients() {
518        return this.updateClusterClients;
519    }
520
521    /**
522     * @param updateClusterClients
523     *            the updateClusterClients to set
524     */
525    public void setUpdateClusterClients(boolean updateClusterClients) {
526        this.updateClusterClients = updateClusterClients;
527    }
528
529    /**
530     * @return the rebalanceClusterClients
531     */
532    @Override
533    public boolean isRebalanceClusterClients() {
534        return this.rebalanceClusterClients;
535    }
536
537    /**
538     * @param rebalanceClusterClients
539     *            the rebalanceClusterClients to set
540     */
541    public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
542        this.rebalanceClusterClients = rebalanceClusterClients;
543    }
544
545    /**
546     * @return the updateClusterClientsOnRemove
547     */
548    @Override
549    public boolean isUpdateClusterClientsOnRemove() {
550        return this.updateClusterClientsOnRemove;
551    }
552
553    /**
554     * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
555     */
556    public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
557        this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
558    }
559
560    /**
561     * @return the updateClusterFilter
562     */
563    public String getUpdateClusterFilter() {
564        return this.updateClusterFilter;
565    }
566
567    /**
568     * @param updateClusterFilter
569     *            the updateClusterFilter to set
570     */
571    public void setUpdateClusterFilter(String updateClusterFilter) {
572        this.updateClusterFilter = updateClusterFilter;
573    }
574
575    @Override
576    public int connectionCount() {
577        return connections.size();
578    }
579
580    @Override
581    public boolean isAllowLinkStealing() {
582        return server.isAllowLinkStealing();
583    }
584
585    public boolean isAuditNetworkProducers() {
586        return auditNetworkProducers;
587    }
588
589    /**
590     * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
591     * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
592     * @param auditNetworkProducers
593     */
594    public void setAuditNetworkProducers(boolean auditNetworkProducers) {
595        this.auditNetworkProducers = auditNetworkProducers;
596    }
597
598    public int getMaximumProducersAllowedPerConnection() {
599        return maximumProducersAllowedPerConnection;
600    }
601
602    public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
603        this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
604    }
605
606    public int getMaximumConsumersAllowedPerConnection() {
607        return maximumConsumersAllowedPerConnection;
608    }
609
610    public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
611        this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
612    }
613
614    /**
615     * Gets the currently configured policy for creating the published connection address of this
616     * TransportConnector.
617     *
618     * @return the publishedAddressPolicy
619     */
620    public PublishedAddressPolicy getPublishedAddressPolicy() {
621        return publishedAddressPolicy;
622    }
623
624    /**
625     * Sets the configured policy for creating the published connection address of this
626     * TransportConnector.
627     *
628     * @return the publishedAddressPolicy
629     */
630    public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
631        this.publishedAddressPolicy = publishedAddressPolicy;
632    }
633
634    public boolean isWarnOnRemoteClose() {
635        return warnOnRemoteClose;
636    }
637
638    public void setWarnOnRemoteClose(boolean warnOnRemoteClose) {
639        this.warnOnRemoteClose = warnOnRemoteClose;
640    }
641}