001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.net.URI;
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.ThreadPoolExecutor;
023
024import org.apache.activemq.Service;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.broker.region.Region;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.broker.region.virtual.VirtualDestination;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.BrokerId;
032import org.apache.activemq.command.BrokerInfo;
033import org.apache.activemq.command.ConnectionInfo;
034import org.apache.activemq.command.DestinationInfo;
035import org.apache.activemq.command.MessageDispatch;
036import org.apache.activemq.command.ProducerInfo;
037import org.apache.activemq.command.SessionInfo;
038import org.apache.activemq.command.TransactionId;
039import org.apache.activemq.store.PListStore;
040import org.apache.activemq.thread.Scheduler;
041import org.apache.activemq.usage.Usage;
042
043/**
044 * The Message Broker which routes messages, maintains subscriptions and
045 * connections, acknowledges messages and handles transactions.
046 *
047 *
048 */
049public interface Broker extends Region, Service {
050
051    /**
052     * Get a Broker from the Broker Stack that is a particular class
053     *
054     * @param type
055     * @return
056     */
057    Broker getAdaptor(Class type);
058
059    /**
060     * Get the id of the broker
061     */
062    BrokerId getBrokerId();
063
064    /**
065     * Get the name of the broker
066     */
067    String getBrokerName();
068
069    /**
070     * A remote Broker connects
071     */
072    void addBroker(Connection connection, BrokerInfo info);
073
074    /**
075     * Remove a BrokerInfo
076     *
077     * @param connection
078     * @param info
079     */
080    void removeBroker(Connection connection, BrokerInfo info);
081
082    /**
083     * A client is establishing a connection with the broker.
084     *
085     * @throws Exception TODO
086     */
087    void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception;
088
089    /**
090     * A client is disconnecting from the broker.
091     *
092     * @param context the environment the operation is being executed under.
093     * @param info
094     * @param error null if the client requested the disconnect or the error
095     *                that caused the client to disconnect.
096     * @throws Exception TODO
097     */
098    void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception;
099
100    /**
101     * Adds a session.
102     *
103     * @param context
104     * @param info
105     * @throws Exception TODO
106     */
107    void addSession(ConnectionContext context, SessionInfo info) throws Exception;
108
109    /**
110     * Removes a session.
111     *
112     * @param context
113     * @param info
114     * @throws Exception TODO
115     */
116    void removeSession(ConnectionContext context, SessionInfo info) throws Exception;
117
118    /**
119     * Adds a producer.
120     *
121     * @param context the enviorment the operation is being executed under.
122     * @throws Exception TODO
123     */
124    @Override
125    void addProducer(ConnectionContext context, ProducerInfo info) throws Exception;
126
127    /**
128     * Removes a producer.
129     *
130     * @param context the enviorment the operation is being executed under.
131     * @throws Exception TODO
132     */
133    @Override
134    void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception;
135
136    /**
137     * @return all clients added to the Broker.
138     * @throws Exception TODO
139     */
140    Connection[] getClients() throws Exception;
141
142    /**
143     * @return all destinations added to the Broker.
144     * @throws Exception TODO
145     */
146    ActiveMQDestination[] getDestinations() throws Exception;
147
148    /**
149     * return a reference destination map of a region based on the destination type
150     * @param destination
151     * @return
152     */
153    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination);
154
155    /**
156     * Gets a list of all the prepared xa transactions.
157     *
158     * @param context transaction ids
159     * @return
160     * @throws Exception TODO
161     */
162    TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
163
164    /**
165     * Starts a transaction.
166     *
167     * @param context
168     * @param xid
169     * @throws Exception TODO
170     */
171    void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception;
172
173    /**
174     * Prepares a transaction. Only valid for xa transactions.
175     *
176     * @param context
177     * @param xid
178     * @return id
179     * @throws Exception TODO
180     */
181    int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
182
183    /**
184     * Rollsback a transaction.
185     *
186     * @param context
187     * @param xid
188     * @throws Exception TODO
189     */
190
191    void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception;
192
193    /**
194     * Commits a transaction.
195     *
196     * @param context
197     * @param xid
198     * @param onePhase
199     * @throws Exception TODO
200     */
201    void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception;
202
203    /**
204     * Forgets a transaction.
205     *
206     * @param context
207     * @param transactionId
208     * @throws Exception
209     */
210    void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
211
212    /**
213     * Get the BrokerInfo's of any connected Brokers
214     *
215     * @return array of peer BrokerInfos
216     */
217    BrokerInfo[] getPeerBrokerInfos();
218
219    /**
220     * Notify the Broker that a dispatch is going to happen
221     *
222     * @param messageDispatch
223     */
224    void preProcessDispatch(MessageDispatch messageDispatch);
225
226    /**
227     * Notify the Broker that a dispatch has happened
228     *
229     * @param messageDispatch
230     */
231    void postProcessDispatch(MessageDispatch messageDispatch);
232
233    /**
234     * @return true if the broker has stopped
235     */
236    boolean isStopped();
237
238    /**
239     * @return a Set of all durable destinations
240     */
241    Set<ActiveMQDestination> getDurableDestinations();
242
243    /**
244     * Add and process a DestinationInfo object
245     *
246     * @param context
247     * @param info
248     * @throws Exception
249     */
250    void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
251
252    /**
253     * Remove and process a DestinationInfo object
254     *
255     * @param context
256     * @param info
257     * @throws Exception
258     */
259    void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
260
261    /**
262     * @return true if fault tolerant
263     */
264    boolean isFaultTolerantConfiguration();
265
266    /**
267     * @return the connection context used to make administration operations on
268     *         startup or via JMX MBeans
269     */
270    ConnectionContext getAdminConnectionContext();
271
272    /**
273     * Sets the default administration connection context used when configuring
274     * the broker on startup or via JMX
275     *
276     * @param adminConnectionContext
277     */
278    void setAdminConnectionContext(ConnectionContext adminConnectionContext);
279
280    /**
281     * @return the temp data store
282     */
283    PListStore getTempDataStore();
284
285    /**
286     * @return the URI that can be used to connect to the local Broker
287     */
288    URI getVmConnectorURI();
289
290    /**
291     * called when the brokerService starts
292     */
293    void brokerServiceStarted();
294
295    /**
296     * @return the BrokerService
297     */
298    BrokerService getBrokerService();
299
300    /**
301     * Ensure we get the Broker at the top of the Stack
302     *
303     * @return the broker at the top of the Stack
304     */
305    Broker getRoot();
306
307    /**
308     * Determine if a message has expired -allows default behaviour to be
309     * overriden - as the timestamp set by the producer can be out of sync with
310     * the broker
311     *
312     * @param messageReference
313     * @return true if the message is expired
314     */
315    boolean isExpired(MessageReference messageReference);
316
317    /**
318     * A Message has Expired
319     *
320     * @param context
321     * @param messageReference
322     * @param subscription, may be null
323     */
324    void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription);
325
326    /**
327     * A message needs to go the a DLQ
328     *
329     *
330     * @param context
331     * @param messageReference
332     * @param poisonCause reason for dlq submission, may be null
333     * @return true if Message was placed in a DLQ false if discarded.
334     */
335    boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause);
336
337    /**
338     * @return the broker sequence id
339     */
340    long getBrokerSequenceId();
341
342    /**
343     * called when message is consumed
344     * @param context
345     * @param messageReference
346     */
347    void messageConsumed(ConnectionContext context, MessageReference messageReference);
348
349    /**
350     * Called when message is delivered to the broker
351     * @param context
352     * @param messageReference
353     */
354    void messageDelivered(ConnectionContext context, MessageReference messageReference);
355
356    /**
357     * Called when a message is discarded - e.g. running low on memory
358     * This will happen only if the policy is enabled - e.g. non durable topics
359     * @param context
360     * @param sub
361     * @param messageReference
362     */
363    void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference);
364
365    /**
366     * Called when there is a slow consumer
367     * @param context
368     * @param destination
369     * @param subs
370     */
371    void slowConsumer(ConnectionContext context,Destination destination, Subscription subs);
372
373    /**
374     * Called to notify a producer is too fast
375     * @param context
376     * @param producerInfo
377     * @param destination
378     */
379    void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination);
380
381    /**
382     * Called when a Usage reaches a limit
383     * @param context
384     * @param destination
385     * @param usage
386     */
387    void isFull(ConnectionContext context,Destination destination,Usage usage);
388
389    void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination);
390
391    void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination);
392
393    /**
394     *  called when the broker becomes the master in a master/slave
395     *  configuration
396     */
397    void nowMasterBroker();
398
399    Scheduler getScheduler();
400
401    ThreadPoolExecutor getExecutor();
402
403    void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp);
404
405    void networkBridgeStopped(BrokerInfo brokerInfo);
406
407
408}