001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.LinkedList;
023import java.util.StringTokenizer;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.regex.Pattern;
026
027import javax.management.ObjectName;
028
029import org.apache.activemq.broker.jmx.ManagedTransportConnector;
030import org.apache.activemq.broker.jmx.ManagementContext;
031import org.apache.activemq.broker.region.ConnectorStatistics;
032import org.apache.activemq.command.BrokerInfo;
033import org.apache.activemq.command.ConnectionControl;
034import org.apache.activemq.security.MessageAuthorizationPolicy;
035import org.apache.activemq.thread.TaskRunnerFactory;
036import org.apache.activemq.transport.Transport;
037import org.apache.activemq.transport.TransportAcceptListener;
038import org.apache.activemq.transport.TransportFactorySupport;
039import org.apache.activemq.transport.TransportServer;
040import org.apache.activemq.transport.discovery.DiscoveryAgent;
041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042import org.apache.activemq.util.ServiceStopper;
043import org.apache.activemq.util.ServiceSupport;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * @org.apache.xbean.XBean
049 */
050public class TransportConnector implements Connector, BrokerServiceAware {
051
052    final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
053
054    protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
055    protected TransportStatusDetector statusDector;
056    private BrokerService brokerService;
057    private TransportServer server;
058    private URI uri;
059    private BrokerInfo brokerInfo = new BrokerInfo();
060    private TaskRunnerFactory taskRunnerFactory;
061    private MessageAuthorizationPolicy messageAuthorizationPolicy;
062    private DiscoveryAgent discoveryAgent;
063    private final ConnectorStatistics statistics = new ConnectorStatistics();
064    private URI discoveryUri;
065    private String name;
066    private boolean disableAsyncDispatch;
067    private boolean enableStatusMonitor = false;
068    private Broker broker;
069    private boolean updateClusterClients = false;
070    private boolean rebalanceClusterClients;
071    private boolean updateClusterClientsOnRemove = false;
072    private String updateClusterFilter;
073    private boolean auditNetworkProducers = false;
074    private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
075    private int maximumConsumersAllowedPerConnection  = Integer.MAX_VALUE;
076    private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
077    private boolean warnOnRemoteClose = false;
078
079    LinkedList<String> peerBrokers = new LinkedList<String>();
080
081    public TransportConnector() {
082    }
083
084    public TransportConnector(TransportServer server) {
085        this();
086        setServer(server);
087        if (server != null && server.getConnectURI() != null) {
088            URI uri = server.getConnectURI();
089            if (uri != null && uri.getScheme().equals("vm")) {
090                setEnableStatusMonitor(false);
091            }
092        }
093    }
094
095    /**
096     * @return Returns the connections.
097     */
098    public CopyOnWriteArrayList<TransportConnection> getConnections() {
099        return connections;
100    }
101
102    /**
103     * Factory method to create a JMX managed version of this transport
104     * connector
105     */
106    public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
107        ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
108        rc.setBrokerInfo(getBrokerInfo());
109        rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
110        rc.setDiscoveryAgent(getDiscoveryAgent());
111        rc.setDiscoveryUri(getDiscoveryUri());
112        rc.setEnableStatusMonitor(isEnableStatusMonitor());
113        rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
114        rc.setName(getName());
115        rc.setTaskRunnerFactory(getTaskRunnerFactory());
116        rc.setUri(getUri());
117        rc.setBrokerService(brokerService);
118        rc.setUpdateClusterClients(isUpdateClusterClients());
119        rc.setRebalanceClusterClients(isRebalanceClusterClients());
120        rc.setUpdateClusterFilter(getUpdateClusterFilter());
121        rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
122        rc.setAuditNetworkProducers(isAuditNetworkProducers());
123        rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
124        rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
125        rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
126        rc.setWarnOnRemoteClose(isWarnOnRemoteClose());
127        return rc;
128    }
129
130    @Override
131    public BrokerInfo getBrokerInfo() {
132        return brokerInfo;
133    }
134
135    public void setBrokerInfo(BrokerInfo brokerInfo) {
136        this.brokerInfo = brokerInfo;
137    }
138
139    public TransportServer getServer() throws IOException, URISyntaxException {
140        if (server == null) {
141            setServer(createTransportServer());
142        }
143        return server;
144    }
145
146    public void setServer(TransportServer server) {
147        this.server = server;
148    }
149
150    public URI getUri() {
151        if (uri == null) {
152            try {
153                uri = getConnectUri();
154            } catch (Throwable e) {
155            }
156        }
157        return uri;
158    }
159
160    /**
161     * Sets the server transport URI to use if there is not a
162     * {@link TransportServer} configured via the
163     * {@link #setServer(TransportServer)} method. This value is used to lazy
164     * create a {@link TransportServer} instance
165     *
166     * @param uri
167     */
168    public void setUri(URI uri) {
169        this.uri = uri;
170    }
171
172    public TaskRunnerFactory getTaskRunnerFactory() {
173        return taskRunnerFactory;
174    }
175
176    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
177        this.taskRunnerFactory = taskRunnerFactory;
178    }
179
180    /**
181     * @return the statistics for this connector
182     */
183    @Override
184    public ConnectorStatistics getStatistics() {
185        return statistics;
186    }
187
188    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
189        return messageAuthorizationPolicy;
190    }
191
192    /**
193     * Sets the policy used to decide if the current connection is authorized to
194     * consume a given message
195     */
196    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
197        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
198    }
199
200    @Override
201    public void start() throws Exception {
202        broker = brokerService.getBroker();
203        brokerInfo.setBrokerName(broker.getBrokerName());
204        brokerInfo.setBrokerId(broker.getBrokerId());
205        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
206        brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
207        brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
208        getServer().setAcceptListener(new TransportAcceptListener() {
209            @Override
210            public void onAccept(final Transport transport) {
211                try {
212                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
213                        @Override
214                        public void run() {
215                            try {
216                                if (!brokerService.isStopping()) {
217                                    Connection connection = createConnection(transport);
218                                    connection.start();
219                                } else {
220                                    throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
221                                }
222                            } catch (Exception e) {
223                                String remoteHost = transport.getRemoteAddress();
224                                ServiceSupport.dispose(transport);
225                                onAcceptError(e, remoteHost);
226                            }
227                        }
228                    });
229                } catch (Exception e) {
230                    String remoteHost = transport.getRemoteAddress();
231                    ServiceSupport.dispose(transport);
232                    onAcceptError(e, remoteHost);
233                }
234            }
235
236            @Override
237            public void onAcceptError(Exception error) {
238                onAcceptError(error, null);
239            }
240
241            private void onAcceptError(Exception error, String remoteHost) {
242                LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
243                        + error);
244                LOG.debug("Reason: " + error, error);
245            }
246        });
247        getServer().setBrokerInfo(brokerInfo);
248        getServer().start();
249
250        DiscoveryAgent da = getDiscoveryAgent();
251        if (da != null) {
252            da.registerService(getPublishableConnectString());
253            da.start();
254        }
255        if (enableStatusMonitor) {
256            this.statusDector = new TransportStatusDetector(this);
257            this.statusDector.start();
258        }
259
260        LOG.info("Connector {} started", getName());
261    }
262
263    public String getPublishableConnectString() throws Exception {
264        String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
265        LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri());
266        return publishableConnectString;
267    }
268
269    public URI getPublishableConnectURI() throws Exception {
270        return publishedAddressPolicy.getPublishableConnectURI(this);
271    }
272
273    @Override
274    public void stop() throws Exception {
275        ServiceStopper ss = new ServiceStopper();
276        if (discoveryAgent != null) {
277            ss.stop(discoveryAgent);
278        }
279        if (server != null) {
280            ss.stop(server);
281        }
282        if (this.statusDector != null) {
283            this.statusDector.stop();
284        }
285
286        for (TransportConnection connection : connections) {
287            ss.stop(connection);
288        }
289        server = null;
290        ss.throwFirstException();
291        LOG.info("Connector {} stopped", getName());
292    }
293
294    // Implementation methods
295    // -------------------------------------------------------------------------
296    protected Connection createConnection(Transport transport) throws IOException {
297        // prefer to use task runner from broker service as stop task runner, as we can then
298        // tie it to the lifecycle of the broker service
299        TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
300                : taskRunnerFactory, brokerService.getTaskRunnerFactory());
301        boolean statEnabled = this.getStatistics().isEnabled();
302        answer.getStatistics().setEnabled(statEnabled);
303        answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
304        return answer;
305    }
306
307    protected TransportServer createTransportServer() throws IOException, URISyntaxException {
308        if (uri == null) {
309            throw new IllegalArgumentException("You must specify either a server or uri property");
310        }
311        if (brokerService == null) {
312            throw new IllegalArgumentException(
313                    "You must specify the brokerService property. Maybe this connector should be added to a broker?");
314        }
315        return TransportFactorySupport.bind(brokerService, uri);
316    }
317
318    public DiscoveryAgent getDiscoveryAgent() throws IOException {
319        if (discoveryAgent == null) {
320            discoveryAgent = createDiscoveryAgent();
321        }
322        return discoveryAgent;
323    }
324
325    protected DiscoveryAgent createDiscoveryAgent() throws IOException {
326        if (discoveryUri != null) {
327            DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
328
329            if (agent != null && agent instanceof BrokerServiceAware) {
330                ((BrokerServiceAware) agent).setBrokerService(brokerService);
331            }
332
333            return agent;
334        }
335        return null;
336    }
337
338    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
339        this.discoveryAgent = discoveryAgent;
340    }
341
342    public URI getDiscoveryUri() {
343        return discoveryUri;
344    }
345
346    public void setDiscoveryUri(URI discoveryUri) {
347        this.discoveryUri = discoveryUri;
348    }
349
350    public URI getConnectUri() throws IOException, URISyntaxException {
351        if (server != null) {
352            return server.getConnectURI();
353        } else {
354            return uri;
355        }
356    }
357
358    public void onStarted(TransportConnection connection) {
359        connections.add(connection);
360    }
361
362    public void onStopped(TransportConnection connection) {
363        connections.remove(connection);
364    }
365
366    public String getName() {
367        if (name == null) {
368            uri = getUri();
369            if (uri != null) {
370                name = uri.toString();
371            }
372        }
373        return name;
374    }
375
376    public void setName(String name) {
377        this.name = name;
378    }
379
380    @Override
381    public String toString() {
382        String rc = getName();
383        if (rc == null) {
384            rc = super.toString();
385        }
386        return rc;
387    }
388
389    protected ConnectionControl getConnectionControl() {
390        boolean rebalance = isRebalanceClusterClients();
391        String connectedBrokers = "";
392        String separator = "";
393
394        if (isUpdateClusterClients()) {
395            synchronized (peerBrokers) {
396                for (String uri : getPeerBrokers()) {
397                    connectedBrokers += separator + uri;
398                    separator = ",";
399                }
400
401                if (rebalance) {
402                    String shuffle = peerBrokers.removeFirst();
403                    peerBrokers.addLast(shuffle);
404                }
405            }
406        }
407        ConnectionControl control = new ConnectionControl();
408        control.setConnectedBrokers(connectedBrokers);
409        control.setRebalanceConnection(rebalance);
410        return control;
411    }
412
413    public void addPeerBroker(BrokerInfo info) {
414        if (isMatchesClusterFilter(info.getBrokerName())) {
415            synchronized (peerBrokers) {
416                getPeerBrokers().addLast(info.getBrokerURL());
417            }
418        }
419    }
420
421    public void removePeerBroker(BrokerInfo info) {
422        synchronized (peerBrokers) {
423            getPeerBrokers().remove(info.getBrokerURL());
424        }
425    }
426
427    public LinkedList<String> getPeerBrokers() {
428        synchronized (peerBrokers) {
429            if (peerBrokers.isEmpty()) {
430                peerBrokers.add(brokerService.getDefaultSocketURIString());
431            }
432            return peerBrokers;
433        }
434    }
435
436    @Override
437    public void updateClientClusterInfo() {
438        if (isRebalanceClusterClients() || isUpdateClusterClients()) {
439            ConnectionControl control = getConnectionControl();
440            for (Connection c : this.connections) {
441                c.updateClient(control);
442                if (isRebalanceClusterClients()) {
443                    control = getConnectionControl();
444                }
445            }
446        }
447    }
448
449    private boolean isMatchesClusterFilter(String brokerName) {
450        boolean result = true;
451        String filter = getUpdateClusterFilter();
452        if (filter != null) {
453            filter = filter.trim();
454            if (filter.length() > 0) {
455                result = false;
456                StringTokenizer tokenizer = new StringTokenizer(filter, ",");
457                while (!result && tokenizer.hasMoreTokens()) {
458                    String token = tokenizer.nextToken();
459                    result = isMatchesClusterFilter(brokerName, token);
460                }
461            }
462        }
463
464        return result;
465    }
466
467    private boolean isMatchesClusterFilter(String brokerName, String match) {
468        boolean result = false;
469        if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
470            result = Pattern.matches(match, brokerName);
471        }
472        return result;
473    }
474
475    public boolean isDisableAsyncDispatch() {
476        return disableAsyncDispatch;
477    }
478
479    public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
480        this.disableAsyncDispatch = disableAsyncDispatch;
481    }
482
483    /**
484     * @return the enableStatusMonitor
485     */
486    public boolean isEnableStatusMonitor() {
487        return enableStatusMonitor;
488    }
489
490    /**
491     * @param enableStatusMonitor
492     *            the enableStatusMonitor to set
493     */
494    public void setEnableStatusMonitor(boolean enableStatusMonitor) {
495        this.enableStatusMonitor = enableStatusMonitor;
496    }
497
498    /**
499     * This is called by the BrokerService right before it starts the transport.
500     */
501    @Override
502    public void setBrokerService(BrokerService brokerService) {
503        this.brokerService = brokerService;
504    }
505
506    public Broker getBroker() {
507        return broker;
508    }
509
510    public BrokerService getBrokerService() {
511        return brokerService;
512    }
513
514    /**
515     * @return the updateClusterClients
516     */
517    @Override
518    public boolean isUpdateClusterClients() {
519        return this.updateClusterClients;
520    }
521
522    /**
523     * @param updateClusterClients
524     *            the updateClusterClients to set
525     */
526    public void setUpdateClusterClients(boolean updateClusterClients) {
527        this.updateClusterClients = updateClusterClients;
528    }
529
530    /**
531     * @return the rebalanceClusterClients
532     */
533    @Override
534    public boolean isRebalanceClusterClients() {
535        return this.rebalanceClusterClients;
536    }
537
538    /**
539     * @param rebalanceClusterClients
540     *            the rebalanceClusterClients to set
541     */
542    public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
543        this.rebalanceClusterClients = rebalanceClusterClients;
544    }
545
546    /**
547     * @return the updateClusterClientsOnRemove
548     */
549    @Override
550    public boolean isUpdateClusterClientsOnRemove() {
551        return this.updateClusterClientsOnRemove;
552    }
553
554    /**
555     * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
556     */
557    public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
558        this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
559    }
560
561    /**
562     * @return the updateClusterFilter
563     */
564    public String getUpdateClusterFilter() {
565        return this.updateClusterFilter;
566    }
567
568    /**
569     * @param updateClusterFilter
570     *            the updateClusterFilter to set
571     */
572    public void setUpdateClusterFilter(String updateClusterFilter) {
573        this.updateClusterFilter = updateClusterFilter;
574    }
575
576    @Override
577    public int connectionCount() {
578        return connections.size();
579    }
580
581    @Override
582    public boolean isAllowLinkStealing() {
583        return server.isAllowLinkStealing();
584    }
585
586    public boolean isAuditNetworkProducers() {
587        return auditNetworkProducers;
588    }
589
590    /**
591     * Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
592     * Note: does not work with conduit=false, networked composite destinations or networked virtual topics
593     * @param auditNetworkProducers
594     */
595    public void setAuditNetworkProducers(boolean auditNetworkProducers) {
596        this.auditNetworkProducers = auditNetworkProducers;
597    }
598
599    public int getMaximumProducersAllowedPerConnection() {
600        return maximumProducersAllowedPerConnection;
601    }
602
603    public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
604        this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
605    }
606
607    public int getMaximumConsumersAllowedPerConnection() {
608        return maximumConsumersAllowedPerConnection;
609    }
610
611    public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
612        this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
613    }
614
615    /**
616     * Gets the currently configured policy for creating the published connection address of this
617     * TransportConnector.
618     *
619     * @return the publishedAddressPolicy
620     */
621    public PublishedAddressPolicy getPublishedAddressPolicy() {
622        return publishedAddressPolicy;
623    }
624
625    /**
626     * Sets the configured policy for creating the published connection address of this
627     * TransportConnector.
628     *
629     * @return the publishedAddressPolicy
630     */
631    public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
632        this.publishedAddressPolicy = publishedAddressPolicy;
633    }
634
635    public boolean isWarnOnRemoteClose() {
636        return warnOnRemoteClose;
637    }
638
639    public void setWarnOnRemoteClose(boolean warnOnRemoteClose) {
640        this.warnOnRemoteClose = warnOnRemoteClose;
641    }
642}